[PeerMgr] Add Caching and Re-processing system (#263)

[peermgr]

- Add request cache with tests
- Add requestCache to peermgr
- refactored peer manager tests
- Added blockInfo struct, to allow sorting on the blockIndex
- added helper methods for cache, pickItem, pickFirstItem, removeHash,
findHash and refactored tests
- renamed requestcache to blockcache
- refactored peer manager to use block cache for block requests *only*
- added blockCallPeer function to handle block requests only
- refactored onDisconnect to add back any pending peer requests that the
disconnected peer did not complete into the peer manager queue

[peermgr/server]

- Modify onBlock handler in server, to send peermgr a BlockInfo struct

[peermgr/syncmgr/server]

- Modified blockIndex in BlockInfo to be uint32 and not uint64
- RequestBlocks in syncmgr now takes an index along with the hash
- modified syncmgr code to pass index along with hash in all methods
This commit is contained in:
decentralisedkev 2019-03-30 18:10:27 +00:00 committed by GitHub
parent 1a6bdd4099
commit abb4da9cbd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 407 additions and 35 deletions

155
pkg/peermgr/blockcache.go Normal file
View file

@ -0,0 +1,155 @@
package peermgr
import (
"errors"
"sort"
"sync"
"github.com/CityOfZion/neo-go/pkg/wire/util"
)
var (
//ErrCacheLimit is returned when the cache limit is reached
ErrCacheLimit = errors.New("nomore items can be added to the cache")
//ErrNoItems is returned when pickItem is called and there are no items in the cache
ErrNoItems = errors.New("there are no items in the cache")
//ErrDuplicateItem is returned when you try to add the same item, more than once to the cache
ErrDuplicateItem = errors.New("this item is already in the cache")
)
//BlockInfo holds the necessary information that the cache needs
// to sort and store block requests
type BlockInfo struct {
BlockHash util.Uint256
BlockIndex uint32
}
// Equals returns true if two blockInfo objects
// have the same hash and the same index
func (bi *BlockInfo) Equals(other BlockInfo) bool {
return bi.BlockHash.Equals(other.BlockHash) && bi.BlockIndex == other.BlockIndex
}
// indexSorter sorts the blockInfos by blockIndex.
type indexSorter []BlockInfo
func (is indexSorter) Len() int { return len(is) }
func (is indexSorter) Swap(i, j int) { is[i], is[j] = is[j], is[i] }
func (is indexSorter) Less(i, j int) bool { return is[i].BlockIndex < is[j].BlockIndex }
//blockCache will cache any pending block requests
// for the node when there are no available nodes
type blockCache struct {
cacheLimit int
cacheLock sync.Mutex
cache []BlockInfo
}
func newBlockCache(cacheLimit int) *blockCache {
return &blockCache{
cache: make([]BlockInfo, 0, cacheLimit),
cacheLimit: cacheLimit,
}
}
func (bc *blockCache) addBlockInfo(bi BlockInfo) error {
if bc.cacheLen() == bc.cacheLimit {
return ErrCacheLimit
}
bc.cacheLock.Lock()
defer bc.cacheLock.Unlock()
// Check for duplicates. slice will always be small so a simple for loop will work
for _, bInfo := range bc.cache {
if bInfo.Equals(bi) {
return ErrDuplicateItem
}
}
bc.cache = append(bc.cache, bi)
sort.Sort(indexSorter(bc.cache))
return nil
}
func (bc *blockCache) addBlockInfos(bis []BlockInfo) error {
if len(bis)+bc.cacheLen() > bc.cacheLimit {
return errors.New("too many items to add, this will exceed the cache limit")
}
for _, bi := range bis {
err := bc.addBlockInfo(bi)
if err != nil {
return err
}
}
return nil
}
func (bc *blockCache) cacheLen() int {
bc.cacheLock.Lock()
defer bc.cacheLock.Unlock()
return len(bc.cache)
}
func (bc *blockCache) pickFirstItem() (BlockInfo, error) {
return bc.pickItem(0)
}
func (bc *blockCache) pickAllItems() ([]BlockInfo, error) {
numOfItems := bc.cacheLen()
items := make([]BlockInfo, 0, numOfItems)
for i := 0; i < numOfItems; i++ {
bi, err := bc.pickFirstItem()
if err != nil {
return nil, err
}
items = append(items, bi)
}
return items, nil
}
func (bc *blockCache) pickItem(i uint) (BlockInfo, error) {
if bc.cacheLen() < 1 {
return BlockInfo{}, ErrNoItems
}
if i >= uint(bc.cacheLen()) {
return BlockInfo{}, errors.New("index out of range")
}
bc.cacheLock.Lock()
defer bc.cacheLock.Unlock()
item := bc.cache[i]
bc.cache = append(bc.cache[:i], bc.cache[i+1:]...)
return item, nil
}
func (bc *blockCache) removeHash(hashToRemove util.Uint256) error {
index, err := bc.findHash(hashToRemove)
if err != nil {
return err
}
_, err = bc.pickItem(uint(index))
return err
}
func (bc *blockCache) findHash(hashToFind util.Uint256) (int, error) {
bc.cacheLock.Lock()
defer bc.cacheLock.Unlock()
for i, bInfo := range bc.cache {
if bInfo.BlockHash.Equals(hashToFind) {
return i, nil
}
}
return -1, errors.New("hash cannot be found in the cache")
}

View file

@ -0,0 +1,80 @@
package peermgr
import (
"math/rand"
"testing"
"github.com/CityOfZion/neo-go/pkg/wire/util"
"github.com/stretchr/testify/assert"
)
func TestAddBlock(t *testing.T) {
bc := &blockCache{
cacheLimit: 20,
}
bi := randomBlockInfo(t)
err := bc.addBlockInfo(bi)
assert.Equal(t, nil, err)
assert.Equal(t, 1, bc.cacheLen())
err = bc.addBlockInfo(bi)
assert.Equal(t, ErrDuplicateItem, err)
assert.Equal(t, 1, bc.cacheLen())
}
func TestCacheLimit(t *testing.T) {
bc := &blockCache{
cacheLimit: 20,
}
for i := 0; i < bc.cacheLimit; i++ {
err := bc.addBlockInfo(randomBlockInfo(t))
assert.Equal(t, nil, err)
}
err := bc.addBlockInfo(randomBlockInfo(t))
assert.Equal(t, ErrCacheLimit, err)
assert.Equal(t, bc.cacheLimit, bc.cacheLen())
}
func TestPickItem(t *testing.T) {
bc := &blockCache{
cacheLimit: 20,
}
for i := 0; i < bc.cacheLimit; i++ {
err := bc.addBlockInfo(randomBlockInfo(t))
assert.Equal(t, nil, err)
}
for i := 0; i < bc.cacheLimit; i++ {
_, err := bc.pickFirstItem()
assert.Equal(t, nil, err)
}
assert.Equal(t, 0, bc.cacheLen())
}
func randomUint256(t *testing.T) util.Uint256 {
rand32 := make([]byte, 32)
rand.Read(rand32)
u, err := util.Uint256DecodeBytes(rand32)
assert.Equal(t, nil, err)
return u
}
func randomBlockInfo(t *testing.T) BlockInfo {
return BlockInfo{
randomUint256(t),
rand.Uint32(),
}
}

View file

@ -2,6 +2,7 @@ package peermgr
import (
"errors"
"fmt"
"sync"
"github.com/CityOfZion/neo-go/pkg/wire/command"
@ -9,6 +10,15 @@ import (
"github.com/CityOfZion/neo-go/pkg/wire/util"
)
const (
// blockCacheLimit is the maximum amount of pending requests that the cache can hold
pendingBlockCacheLimit = 20
//peerBlockCacheLimit is the maximum amount of inflight blocks that a peer can
// have, before they are flagged as busy
peerBlockCacheLimit = 1
)
var (
//ErrNoAvailablePeers is returned when a request for data from a peer is invoked
// but there are no available peers to request data from
@ -28,6 +38,10 @@ type mPeer interface {
}
type peerstats struct {
// when a peer is sent a blockRequest
// the peermanager will track this using this blockCache
blockCache *blockCache
// all other requests will be tracked using the requests map
requests map[command.Type]bool
}
@ -35,12 +49,15 @@ type peerstats struct {
type PeerMgr struct {
pLock sync.RWMutex
peers map[mPeer]peerstats
requestCache *blockCache
}
//New returns a new peermgr object
func New() *PeerMgr {
return &PeerMgr{
peers: make(map[mPeer]peerstats),
peers: make(map[mPeer]peerstats),
requestCache: newBlockCache(pendingBlockCacheLimit),
}
}
@ -52,7 +69,10 @@ func (pmgr *PeerMgr) AddPeer(peer mPeer) {
if _, exists := pmgr.peers[peer]; exists {
return
}
pmgr.peers[peer] = peerstats{requests: make(map[command.Type]bool)}
pmgr.peers[peer] = peerstats{
requests: make(map[command.Type]bool),
blockCache: newBlockCache(peerBlockCacheLimit),
}
go pmgr.onDisconnect(peer)
}
@ -61,6 +81,8 @@ func (pmgr *PeerMgr) AddPeer(peer mPeer) {
func (pmgr *PeerMgr) MsgReceived(peer mPeer, cmd command.Type) error {
pmgr.pLock.Lock()
defer pmgr.pLock.Unlock()
// if peer was unknown then disconnect
val, ok := pmgr.peers[peer]
if !ok {
@ -76,6 +98,44 @@ func (pmgr *PeerMgr) MsgReceived(peer mPeer, cmd command.Type) error {
return nil
}
//BlockMsgReceived notifies the peer manager that we have received a
// block message from a peer
func (pmgr *PeerMgr) BlockMsgReceived(peer mPeer, bi BlockInfo) error {
// if peer was unknown then disconnect
val, ok := pmgr.peers[peer]
if !ok {
go func() {
peer.NotifyDisconnect()
}()
peer.Disconnect()
return ErrUnknownPeer
}
// // remove item from the peersBlock cache
err := val.blockCache.removeHash(bi.BlockHash)
if err != nil {
return err
}
// check if cache empty, if so then return
if pmgr.requestCache.cacheLen() == 0 {
return nil
}
// Try to clean an item from the pendingBlockCache, a peer has just finished serving a block request
cachedBInfo, err := pmgr.requestCache.pickFirstItem()
if err != nil {
return err
}
return pmgr.blockCallPeer(cachedBInfo, func(p mPeer) error {
return p.RequestBlocks([]util.Uint256{cachedBInfo.BlockHash})
})
}
// Len returns the amount of peers that the peer manager
//currently knows about
func (pmgr *PeerMgr) Len() int {
@ -84,25 +144,34 @@ func (pmgr *PeerMgr) Len() int {
return len(pmgr.peers)
}
// RequestBlock will request a block from the most
// RequestBlock will request a block from the most
// available peer. Then update it's stats, so we know that
// this peer is busy
func (pmgr *PeerMgr) RequestBlock(hash util.Uint256) error {
return pmgr.callPeerForCmd(command.Block, func(p mPeer) error {
return p.RequestBlocks([]util.Uint256{hash})
func (pmgr *PeerMgr) RequestBlock(bi BlockInfo) error {
pmgr.pLock.Lock()
defer pmgr.pLock.Unlock()
err := pmgr.blockCallPeer(bi, func(p mPeer) error {
return p.RequestBlocks([]util.Uint256{bi.BlockHash})
})
if err == ErrNoAvailablePeers {
return pmgr.requestCache.addBlockInfo(bi)
}
return err
}
// RequestHeaders will request a headers from the most available peer.
func (pmgr *PeerMgr) RequestHeaders(hash util.Uint256) error {
pmgr.pLock.Lock()
defer pmgr.pLock.Unlock()
return pmgr.callPeerForCmd(command.Headers, func(p mPeer) error {
return p.RequestHeaders(hash)
})
}
func (pmgr *PeerMgr) callPeerForCmd(cmd command.Type, f func(p mPeer) error) error {
pmgr.pLock.Lock()
defer pmgr.pLock.Unlock()
for peer, stats := range pmgr.peers {
if !stats.requests[cmd] {
stats.requests[cmd] = true
@ -111,12 +180,48 @@ func (pmgr *PeerMgr) callPeerForCmd(cmd command.Type, f func(p mPeer) error) err
}
return ErrNoAvailablePeers
}
func (pmgr *PeerMgr) blockCallPeer(bi BlockInfo, f func(p mPeer) error) error {
for peer, stats := range pmgr.peers {
if stats.blockCache.cacheLen() < peerBlockCacheLimit {
err := stats.blockCache.addBlockInfo(bi)
if err != nil {
return err
}
return f(peer)
}
}
return ErrNoAvailablePeers
}
func (pmgr *PeerMgr) onDisconnect(p mPeer) {
// Blocking until peer is disconnected
p.NotifyDisconnect()
pmgr.pLock.Lock()
delete(pmgr.peers, p)
pmgr.pLock.Unlock()
defer func() {
delete(pmgr.peers, p)
pmgr.pLock.Unlock()
}()
// Add all of peers outstanding block requests into
// the peer managers pendingBlockRequestCache
val, ok := pmgr.peers[p]
if !ok {
return
}
pendingRequests, err := val.blockCache.pickAllItems()
if err != nil {
fmt.Println(err.Error())
return
}
err = pmgr.requestCache.addBlockInfos(pendingRequests)
if err != nil {
fmt.Println(err.Error())
return
}
}

View file

@ -4,7 +4,6 @@ import (
"testing"
"github.com/CityOfZion/neo-go/pkg/wire/command"
"github.com/CityOfZion/neo-go/pkg/wire/util"
"github.com/stretchr/testify/assert"
)
@ -59,42 +58,65 @@ func TestRequestBlocks(t *testing.T) {
pmgr.AddPeer(peerB)
pmgr.AddPeer(peerC)
err := pmgr.RequestBlock(util.Uint256{})
firstBlock := randomBlockInfo(t)
err := pmgr.RequestBlock(firstBlock)
assert.Nil(t, err)
err = pmgr.RequestBlock(util.Uint256{})
secondBlock := randomBlockInfo(t)
err = pmgr.RequestBlock(secondBlock)
assert.Nil(t, err)
err = pmgr.RequestBlock(util.Uint256{})
thirdBlock := randomBlockInfo(t)
err = pmgr.RequestBlock(thirdBlock)
assert.Nil(t, err)
// Since the peer manager did not get a MsgReceived
// in between the block requests
// a request should be sent to all peers
// This is only true, if peerBlockCacheLimit == 1
assert.Equal(t, 1, peerA.blockRequested)
assert.Equal(t, 1, peerB.blockRequested)
assert.Equal(t, 1, peerC.blockRequested)
// Since the peer manager still has not received a MsgReceived
// another call to request blocks, will return a NoAvailablePeerError
// another call to request blocks, will add the request to the cache
// and return a nil err
err = pmgr.RequestBlock(util.Uint256{})
assert.Equal(t, ErrNoAvailablePeers, err)
fourthBlock := randomBlockInfo(t)
err = pmgr.RequestBlock(fourthBlock)
assert.Equal(t, nil, err)
assert.Equal(t, 1, pmgr.requestCache.cacheLen())
// If we tell the peer manager that peerA has given us a block
// then send another BlockRequest. It will go to peerA
// since the other two peers are still busy with their
// block requests
// If we tell the peer manager that we have received a block
// it will check the cache for any pending requests and send a block request if there are any.
// The request will go to the peer who sent back the block corresponding to the first hash
// since the other two peers are still busy with their block requests
pmgr.MsgReceived(peerA, command.Block)
err = pmgr.RequestBlock(util.Uint256{})
peer := findPeerwithHash(t, pmgr, firstBlock.BlockHash)
err = pmgr.BlockMsgReceived(peer, firstBlock)
assert.Nil(t, err)
assert.Equal(t, 2, peerA.blockRequested)
assert.Equal(t, 1, peerB.blockRequested)
assert.Equal(t, 1, peerC.blockRequested)
totalRequests := peerA.blockRequested + peerB.blockRequested + peerC.blockRequested
assert.Equal(t, 4, totalRequests)
// // cache should be empty now
assert.Equal(t, 0, pmgr.requestCache.cacheLen())
}
// The peer manager does not tell you what peer was sent a particular block request
// For testing purposes, the following function will find that peer
func findPeerwithHash(t *testing.T, pmgr *PeerMgr, blockHash util.Uint256) mPeer {
for peer, stats := range pmgr.peers {
_, err := stats.blockCache.findHash(blockHash)
if err == nil {
return peer
}
}
assert.Fail(t, "cannot find a peer with that hash")
return nil
}
func TestRequestHeaders(t *testing.T) {
pmgr := New()
@ -152,7 +174,7 @@ func TestUnknownPeer(t *testing.T) {
quit: make(chan bool),
}
err := pmgr.MsgReceived(unknownPeer, command.Block)
err := pmgr.MsgReceived(unknownPeer, command.Headers)
assert.Equal(t, true, unknownPeer.disconnected)
assert.Equal(t, ErrUnknownPeer, err)
}

View file

@ -3,6 +3,8 @@ package server
import (
"encoding/binary"
"github.com/CityOfZion/neo-go/pkg/peermgr"
"github.com/CityOfZion/neo-go/pkg/peer"
"github.com/CityOfZion/neo-go/pkg/syncmgr"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
@ -34,7 +36,10 @@ func (s *Server) onHeader(peer *peer.Peer, hdrsMessage *payload.HeadersMessage)
}
func (s *Server) onBlock(peer *peer.Peer, blockMsg *payload.BlockMessage) {
s.pmg.MsgReceived(peer, blockMsg.Command())
s.pmg.BlockMsgReceived(peer, peermgr.BlockInfo{
BlockHash: blockMsg.Hash,
BlockIndex: blockMsg.Index,
})
s.smg.OnBlock(peer, blockMsg)
}
@ -50,8 +55,11 @@ func (s *Server) requestHeaders(hash util.Uint256) error {
return s.pmg.RequestHeaders(hash)
}
func (s *Server) requestBlock(hash util.Uint256) error {
return s.pmg.RequestBlock(hash)
func (s *Server) requestBlock(hash util.Uint256, index uint32) error {
return s.pmg.RequestBlock(peermgr.BlockInfo{
BlockHash: hash,
BlockIndex: index,
})
}
// getNextBlockHash searches the database for the blockHash

View file

@ -32,7 +32,7 @@ func (s *Syncmgr) blockModeOnBlock(peer SyncPeer, block payload.Block) error {
if err != nil {
return err
}
err = s.cfg.RequestBlock(nextHash)
err = s.cfg.RequestBlock(nextHash, block.Index)
return err
}

View file

@ -18,7 +18,7 @@ type Config struct {
//RequestBlock will send a getdata request for the block
// with the hash passed as a parameter
RequestBlock func(hash util.Uint256) error
RequestBlock func(hash util.Uint256, index uint32) error
// GetNextBlockHash returns the block hash of the header infront of thr block
// at the tip of this nodes chain. This assumes that the node is not in sync

View file

@ -12,6 +12,7 @@ func (s *Syncmgr) headersModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase)
// Note: For the un-optimised version, we move straight to blocksOnly mode
firstHash := hdrs[0].Hash
firstHdrIndex := hdrs[0].Index
err := s.cfg.ProcessHeaders(hdrs)
if err == nil {
@ -19,7 +20,7 @@ func (s *Syncmgr) headersModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase)
s.headerHash = hdrs[len(hdrs)-1].Hash
s.syncmode = blockMode
return s.cfg.RequestBlock(firstHash)
return s.cfg.RequestBlock(firstHash, firstHdrIndex)
}
// Check whether it is a validation error, or a database error

View file

@ -46,7 +46,7 @@ func (s *syncTestHelper) FetchBlockAgain(util.Uint256) error {
return s.err
}
func (s *syncTestHelper) RequestBlock(util.Uint256) error {
func (s *syncTestHelper) RequestBlock(util.Uint256, uint32) error {
s.blockFetchRequest++
return s.err
}

View file

@ -16,6 +16,7 @@ func (s *Syncmgr) normalModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase)
lenHeaders := len(hdrs)
firstHash := hdrs[0].Hash
firstHdrIndex := hdrs[0].Index
lastHash := hdrs[lenHeaders-1].Hash
// Update syncmgr latest header
@ -32,7 +33,7 @@ func (s *Syncmgr) normalModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase)
// Bounds state that len > 1 && len!= 2000 & maxHeadersInMessage == 2000
// This means that we have less than 2k headers
s.syncmode = blockMode
return s.cfg.RequestBlock(firstHash)
return s.cfg.RequestBlock(firstHash, firstHdrIndex)
}
// normalModeOnBlock is called when the sync manager is normal mode