From abb4da9cbd615c062979cf544ba1d1c891176160 Mon Sep 17 00:00:00 2001 From: decentralisedkev <37423678+decentralisedkev@users.noreply.github.com> Date: Sat, 30 Mar 2019 18:10:27 +0000 Subject: [PATCH] [PeerMgr] Add Caching and Re-processing system (#263) [peermgr] - Add request cache with tests - Add requestCache to peermgr - refactored peer manager tests - Added blockInfo struct, to allow sorting on the blockIndex - added helper methods for cache, pickItem, pickFirstItem, removeHash, findHash and refactored tests - renamed requestcache to blockcache - refactored peer manager to use block cache for block requests *only* - added blockCallPeer function to handle block requests only - refactored onDisconnect to add back any pending peer requests that the disconnected peer did not complete into the peer manager queue [peermgr/server] - Modify onBlock handler in server, to send peermgr a BlockInfo struct [peermgr/syncmgr/server] - Modified blockIndex in BlockInfo to be uint32 and not uint64 - RequestBlocks in syncmgr now takes an index along with the hash - modified syncmgr code to pass index along with hash in all methods --- pkg/peermgr/blockcache.go | 155 ++++++++++++++++++++++++++++++++ pkg/peermgr/blockcache_test.go | 80 +++++++++++++++++ pkg/peermgr/peermgr.go | 125 +++++++++++++++++++++++--- pkg/peermgr/peermgr_test.go | 56 ++++++++---- pkg/server/syncmgr.go | 14 ++- pkg/syncmgr/blockmode.go | 2 +- pkg/syncmgr/config.go | 2 +- pkg/syncmgr/headermode.go | 3 +- pkg/syncmgr/mockhelpers_test.go | 2 +- pkg/syncmgr/normalmode.go | 3 +- 10 files changed, 407 insertions(+), 35 deletions(-) create mode 100644 pkg/peermgr/blockcache.go create mode 100644 pkg/peermgr/blockcache_test.go diff --git a/pkg/peermgr/blockcache.go b/pkg/peermgr/blockcache.go new file mode 100644 index 000000000..8e06b8251 --- /dev/null +++ b/pkg/peermgr/blockcache.go @@ -0,0 +1,155 @@ +package peermgr + +import ( + "errors" + "sort" + "sync" + + "github.com/CityOfZion/neo-go/pkg/wire/util" +) + +var ( + //ErrCacheLimit is returned when the cache limit is reached + ErrCacheLimit = errors.New("nomore items can be added to the cache") + + //ErrNoItems is returned when pickItem is called and there are no items in the cache + ErrNoItems = errors.New("there are no items in the cache") + + //ErrDuplicateItem is returned when you try to add the same item, more than once to the cache + ErrDuplicateItem = errors.New("this item is already in the cache") +) + +//BlockInfo holds the necessary information that the cache needs +// to sort and store block requests +type BlockInfo struct { + BlockHash util.Uint256 + BlockIndex uint32 +} + +// Equals returns true if two blockInfo objects +// have the same hash and the same index +func (bi *BlockInfo) Equals(other BlockInfo) bool { + return bi.BlockHash.Equals(other.BlockHash) && bi.BlockIndex == other.BlockIndex +} + +// indexSorter sorts the blockInfos by blockIndex. +type indexSorter []BlockInfo + +func (is indexSorter) Len() int { return len(is) } +func (is indexSorter) Swap(i, j int) { is[i], is[j] = is[j], is[i] } +func (is indexSorter) Less(i, j int) bool { return is[i].BlockIndex < is[j].BlockIndex } + +//blockCache will cache any pending block requests +// for the node when there are no available nodes +type blockCache struct { + cacheLimit int + cacheLock sync.Mutex + cache []BlockInfo +} + +func newBlockCache(cacheLimit int) *blockCache { + return &blockCache{ + cache: make([]BlockInfo, 0, cacheLimit), + cacheLimit: cacheLimit, + } +} + +func (bc *blockCache) addBlockInfo(bi BlockInfo) error { + if bc.cacheLen() == bc.cacheLimit { + return ErrCacheLimit + } + + bc.cacheLock.Lock() + defer bc.cacheLock.Unlock() + + // Check for duplicates. slice will always be small so a simple for loop will work + for _, bInfo := range bc.cache { + if bInfo.Equals(bi) { + return ErrDuplicateItem + } + } + bc.cache = append(bc.cache, bi) + + sort.Sort(indexSorter(bc.cache)) + + return nil +} + +func (bc *blockCache) addBlockInfos(bis []BlockInfo) error { + + if len(bis)+bc.cacheLen() > bc.cacheLimit { + return errors.New("too many items to add, this will exceed the cache limit") + } + + for _, bi := range bis { + err := bc.addBlockInfo(bi) + if err != nil { + return err + } + } + return nil +} + +func (bc *blockCache) cacheLen() int { + bc.cacheLock.Lock() + defer bc.cacheLock.Unlock() + return len(bc.cache) +} + +func (bc *blockCache) pickFirstItem() (BlockInfo, error) { + return bc.pickItem(0) +} + +func (bc *blockCache) pickAllItems() ([]BlockInfo, error) { + + numOfItems := bc.cacheLen() + + items := make([]BlockInfo, 0, numOfItems) + + for i := 0; i < numOfItems; i++ { + bi, err := bc.pickFirstItem() + if err != nil { + return nil, err + } + items = append(items, bi) + } + return items, nil +} + +func (bc *blockCache) pickItem(i uint) (BlockInfo, error) { + if bc.cacheLen() < 1 { + return BlockInfo{}, ErrNoItems + } + + if i >= uint(bc.cacheLen()) { + return BlockInfo{}, errors.New("index out of range") + } + + bc.cacheLock.Lock() + defer bc.cacheLock.Unlock() + + item := bc.cache[i] + bc.cache = append(bc.cache[:i], bc.cache[i+1:]...) + return item, nil +} + +func (bc *blockCache) removeHash(hashToRemove util.Uint256) error { + index, err := bc.findHash(hashToRemove) + if err != nil { + return err + } + + _, err = bc.pickItem(uint(index)) + return err +} + +func (bc *blockCache) findHash(hashToFind util.Uint256) (int, error) { + bc.cacheLock.Lock() + defer bc.cacheLock.Unlock() + for i, bInfo := range bc.cache { + if bInfo.BlockHash.Equals(hashToFind) { + return i, nil + } + } + return -1, errors.New("hash cannot be found in the cache") +} diff --git a/pkg/peermgr/blockcache_test.go b/pkg/peermgr/blockcache_test.go new file mode 100644 index 000000000..3cb928e9e --- /dev/null +++ b/pkg/peermgr/blockcache_test.go @@ -0,0 +1,80 @@ +package peermgr + +import ( + "math/rand" + "testing" + + "github.com/CityOfZion/neo-go/pkg/wire/util" + "github.com/stretchr/testify/assert" +) + +func TestAddBlock(t *testing.T) { + + bc := &blockCache{ + cacheLimit: 20, + } + bi := randomBlockInfo(t) + + err := bc.addBlockInfo(bi) + assert.Equal(t, nil, err) + + assert.Equal(t, 1, bc.cacheLen()) + + err = bc.addBlockInfo(bi) + assert.Equal(t, ErrDuplicateItem, err) + + assert.Equal(t, 1, bc.cacheLen()) +} + +func TestCacheLimit(t *testing.T) { + + bc := &blockCache{ + cacheLimit: 20, + } + + for i := 0; i < bc.cacheLimit; i++ { + err := bc.addBlockInfo(randomBlockInfo(t)) + assert.Equal(t, nil, err) + } + + err := bc.addBlockInfo(randomBlockInfo(t)) + assert.Equal(t, ErrCacheLimit, err) + + assert.Equal(t, bc.cacheLimit, bc.cacheLen()) +} +func TestPickItem(t *testing.T) { + + bc := &blockCache{ + cacheLimit: 20, + } + + for i := 0; i < bc.cacheLimit; i++ { + err := bc.addBlockInfo(randomBlockInfo(t)) + assert.Equal(t, nil, err) + } + + for i := 0; i < bc.cacheLimit; i++ { + _, err := bc.pickFirstItem() + assert.Equal(t, nil, err) + } + + assert.Equal(t, 0, bc.cacheLen()) +} + +func randomUint256(t *testing.T) util.Uint256 { + rand32 := make([]byte, 32) + rand.Read(rand32) + + u, err := util.Uint256DecodeBytes(rand32) + assert.Equal(t, nil, err) + + return u +} + +func randomBlockInfo(t *testing.T) BlockInfo { + + return BlockInfo{ + randomUint256(t), + rand.Uint32(), + } +} diff --git a/pkg/peermgr/peermgr.go b/pkg/peermgr/peermgr.go index 047a109c3..e83e37768 100644 --- a/pkg/peermgr/peermgr.go +++ b/pkg/peermgr/peermgr.go @@ -2,6 +2,7 @@ package peermgr import ( "errors" + "fmt" "sync" "github.com/CityOfZion/neo-go/pkg/wire/command" @@ -9,6 +10,15 @@ import ( "github.com/CityOfZion/neo-go/pkg/wire/util" ) +const ( + // blockCacheLimit is the maximum amount of pending requests that the cache can hold + pendingBlockCacheLimit = 20 + + //peerBlockCacheLimit is the maximum amount of inflight blocks that a peer can + // have, before they are flagged as busy + peerBlockCacheLimit = 1 +) + var ( //ErrNoAvailablePeers is returned when a request for data from a peer is invoked // but there are no available peers to request data from @@ -28,6 +38,10 @@ type mPeer interface { } type peerstats struct { + // when a peer is sent a blockRequest + // the peermanager will track this using this blockCache + blockCache *blockCache + // all other requests will be tracked using the requests map requests map[command.Type]bool } @@ -35,12 +49,15 @@ type peerstats struct { type PeerMgr struct { pLock sync.RWMutex peers map[mPeer]peerstats + + requestCache *blockCache } //New returns a new peermgr object func New() *PeerMgr { return &PeerMgr{ - peers: make(map[mPeer]peerstats), + peers: make(map[mPeer]peerstats), + requestCache: newBlockCache(pendingBlockCacheLimit), } } @@ -52,7 +69,10 @@ func (pmgr *PeerMgr) AddPeer(peer mPeer) { if _, exists := pmgr.peers[peer]; exists { return } - pmgr.peers[peer] = peerstats{requests: make(map[command.Type]bool)} + pmgr.peers[peer] = peerstats{ + requests: make(map[command.Type]bool), + blockCache: newBlockCache(peerBlockCacheLimit), + } go pmgr.onDisconnect(peer) } @@ -61,6 +81,8 @@ func (pmgr *PeerMgr) AddPeer(peer mPeer) { func (pmgr *PeerMgr) MsgReceived(peer mPeer, cmd command.Type) error { pmgr.pLock.Lock() defer pmgr.pLock.Unlock() + + // if peer was unknown then disconnect val, ok := pmgr.peers[peer] if !ok { @@ -76,6 +98,44 @@ func (pmgr *PeerMgr) MsgReceived(peer mPeer, cmd command.Type) error { return nil } +//BlockMsgReceived notifies the peer manager that we have received a +// block message from a peer +func (pmgr *PeerMgr) BlockMsgReceived(peer mPeer, bi BlockInfo) error { + + // if peer was unknown then disconnect + val, ok := pmgr.peers[peer] + if !ok { + + go func() { + peer.NotifyDisconnect() + }() + + peer.Disconnect() + return ErrUnknownPeer + } + + // // remove item from the peersBlock cache + err := val.blockCache.removeHash(bi.BlockHash) + if err != nil { + return err + } + + // check if cache empty, if so then return + if pmgr.requestCache.cacheLen() == 0 { + return nil + } + + // Try to clean an item from the pendingBlockCache, a peer has just finished serving a block request + cachedBInfo, err := pmgr.requestCache.pickFirstItem() + if err != nil { + return err + } + + return pmgr.blockCallPeer(cachedBInfo, func(p mPeer) error { + return p.RequestBlocks([]util.Uint256{cachedBInfo.BlockHash}) + }) +} + // Len returns the amount of peers that the peer manager //currently knows about func (pmgr *PeerMgr) Len() int { @@ -84,25 +144,34 @@ func (pmgr *PeerMgr) Len() int { return len(pmgr.peers) } -// RequestBlock will request a block from the most +// RequestBlock will request a block from the most // available peer. Then update it's stats, so we know that // this peer is busy -func (pmgr *PeerMgr) RequestBlock(hash util.Uint256) error { - return pmgr.callPeerForCmd(command.Block, func(p mPeer) error { - return p.RequestBlocks([]util.Uint256{hash}) +func (pmgr *PeerMgr) RequestBlock(bi BlockInfo) error { + pmgr.pLock.Lock() + defer pmgr.pLock.Unlock() + + err := pmgr.blockCallPeer(bi, func(p mPeer) error { + return p.RequestBlocks([]util.Uint256{bi.BlockHash}) }) + + if err == ErrNoAvailablePeers { + return pmgr.requestCache.addBlockInfo(bi) + } + + return err } // RequestHeaders will request a headers from the most available peer. func (pmgr *PeerMgr) RequestHeaders(hash util.Uint256) error { + pmgr.pLock.Lock() + defer pmgr.pLock.Unlock() return pmgr.callPeerForCmd(command.Headers, func(p mPeer) error { return p.RequestHeaders(hash) }) } func (pmgr *PeerMgr) callPeerForCmd(cmd command.Type, f func(p mPeer) error) error { - pmgr.pLock.Lock() - defer pmgr.pLock.Unlock() for peer, stats := range pmgr.peers { if !stats.requests[cmd] { stats.requests[cmd] = true @@ -111,12 +180,48 @@ func (pmgr *PeerMgr) callPeerForCmd(cmd command.Type, f func(p mPeer) error) err } return ErrNoAvailablePeers } + +func (pmgr *PeerMgr) blockCallPeer(bi BlockInfo, f func(p mPeer) error) error { + for peer, stats := range pmgr.peers { + if stats.blockCache.cacheLen() < peerBlockCacheLimit { + err := stats.blockCache.addBlockInfo(bi) + if err != nil { + return err + } + return f(peer) + } + } + return ErrNoAvailablePeers +} + func (pmgr *PeerMgr) onDisconnect(p mPeer) { // Blocking until peer is disconnected p.NotifyDisconnect() pmgr.pLock.Lock() - delete(pmgr.peers, p) - pmgr.pLock.Unlock() + defer func() { + delete(pmgr.peers, p) + pmgr.pLock.Unlock() + }() + + // Add all of peers outstanding block requests into + // the peer managers pendingBlockRequestCache + + val, ok := pmgr.peers[p] + if !ok { + return + } + + pendingRequests, err := val.blockCache.pickAllItems() + if err != nil { + fmt.Println(err.Error()) + return + } + + err = pmgr.requestCache.addBlockInfos(pendingRequests) + if err != nil { + fmt.Println(err.Error()) + return + } } diff --git a/pkg/peermgr/peermgr_test.go b/pkg/peermgr/peermgr_test.go index 86d5be41c..9a725af43 100644 --- a/pkg/peermgr/peermgr_test.go +++ b/pkg/peermgr/peermgr_test.go @@ -4,7 +4,6 @@ import ( "testing" "github.com/CityOfZion/neo-go/pkg/wire/command" - "github.com/CityOfZion/neo-go/pkg/wire/util" "github.com/stretchr/testify/assert" ) @@ -59,42 +58,65 @@ func TestRequestBlocks(t *testing.T) { pmgr.AddPeer(peerB) pmgr.AddPeer(peerC) - err := pmgr.RequestBlock(util.Uint256{}) + firstBlock := randomBlockInfo(t) + err := pmgr.RequestBlock(firstBlock) assert.Nil(t, err) - err = pmgr.RequestBlock(util.Uint256{}) + secondBlock := randomBlockInfo(t) + err = pmgr.RequestBlock(secondBlock) assert.Nil(t, err) - err = pmgr.RequestBlock(util.Uint256{}) + thirdBlock := randomBlockInfo(t) + err = pmgr.RequestBlock(thirdBlock) assert.Nil(t, err) // Since the peer manager did not get a MsgReceived // in between the block requests // a request should be sent to all peers + // This is only true, if peerBlockCacheLimit == 1 assert.Equal(t, 1, peerA.blockRequested) assert.Equal(t, 1, peerB.blockRequested) assert.Equal(t, 1, peerC.blockRequested) // Since the peer manager still has not received a MsgReceived - // another call to request blocks, will return a NoAvailablePeerError + // another call to request blocks, will add the request to the cache + // and return a nil err - err = pmgr.RequestBlock(util.Uint256{}) - assert.Equal(t, ErrNoAvailablePeers, err) + fourthBlock := randomBlockInfo(t) + err = pmgr.RequestBlock(fourthBlock) + assert.Equal(t, nil, err) + assert.Equal(t, 1, pmgr.requestCache.cacheLen()) - // If we tell the peer manager that peerA has given us a block - // then send another BlockRequest. It will go to peerA - // since the other two peers are still busy with their - // block requests + // If we tell the peer manager that we have received a block + // it will check the cache for any pending requests and send a block request if there are any. + // The request will go to the peer who sent back the block corresponding to the first hash + // since the other two peers are still busy with their block requests - pmgr.MsgReceived(peerA, command.Block) - err = pmgr.RequestBlock(util.Uint256{}) + peer := findPeerwithHash(t, pmgr, firstBlock.BlockHash) + err = pmgr.BlockMsgReceived(peer, firstBlock) assert.Nil(t, err) - assert.Equal(t, 2, peerA.blockRequested) - assert.Equal(t, 1, peerB.blockRequested) - assert.Equal(t, 1, peerC.blockRequested) + totalRequests := peerA.blockRequested + peerB.blockRequested + peerC.blockRequested + assert.Equal(t, 4, totalRequests) + + // // cache should be empty now + assert.Equal(t, 0, pmgr.requestCache.cacheLen()) } + +// The peer manager does not tell you what peer was sent a particular block request +// For testing purposes, the following function will find that peer +func findPeerwithHash(t *testing.T, pmgr *PeerMgr, blockHash util.Uint256) mPeer { + for peer, stats := range pmgr.peers { + _, err := stats.blockCache.findHash(blockHash) + if err == nil { + return peer + } + } + assert.Fail(t, "cannot find a peer with that hash") + return nil +} + func TestRequestHeaders(t *testing.T) { pmgr := New() @@ -152,7 +174,7 @@ func TestUnknownPeer(t *testing.T) { quit: make(chan bool), } - err := pmgr.MsgReceived(unknownPeer, command.Block) + err := pmgr.MsgReceived(unknownPeer, command.Headers) assert.Equal(t, true, unknownPeer.disconnected) assert.Equal(t, ErrUnknownPeer, err) } diff --git a/pkg/server/syncmgr.go b/pkg/server/syncmgr.go index 3df456626..7de87188b 100644 --- a/pkg/server/syncmgr.go +++ b/pkg/server/syncmgr.go @@ -3,6 +3,8 @@ package server import ( "encoding/binary" + "github.com/CityOfZion/neo-go/pkg/peermgr" + "github.com/CityOfZion/neo-go/pkg/peer" "github.com/CityOfZion/neo-go/pkg/syncmgr" "github.com/CityOfZion/neo-go/pkg/wire/payload" @@ -34,7 +36,10 @@ func (s *Server) onHeader(peer *peer.Peer, hdrsMessage *payload.HeadersMessage) } func (s *Server) onBlock(peer *peer.Peer, blockMsg *payload.BlockMessage) { - s.pmg.MsgReceived(peer, blockMsg.Command()) + s.pmg.BlockMsgReceived(peer, peermgr.BlockInfo{ + BlockHash: blockMsg.Hash, + BlockIndex: blockMsg.Index, + }) s.smg.OnBlock(peer, blockMsg) } @@ -50,8 +55,11 @@ func (s *Server) requestHeaders(hash util.Uint256) error { return s.pmg.RequestHeaders(hash) } -func (s *Server) requestBlock(hash util.Uint256) error { - return s.pmg.RequestBlock(hash) +func (s *Server) requestBlock(hash util.Uint256, index uint32) error { + return s.pmg.RequestBlock(peermgr.BlockInfo{ + BlockHash: hash, + BlockIndex: index, + }) } // getNextBlockHash searches the database for the blockHash diff --git a/pkg/syncmgr/blockmode.go b/pkg/syncmgr/blockmode.go index 2e5dadd6f..83d3acd50 100644 --- a/pkg/syncmgr/blockmode.go +++ b/pkg/syncmgr/blockmode.go @@ -32,7 +32,7 @@ func (s *Syncmgr) blockModeOnBlock(peer SyncPeer, block payload.Block) error { if err != nil { return err } - err = s.cfg.RequestBlock(nextHash) + err = s.cfg.RequestBlock(nextHash, block.Index) return err } diff --git a/pkg/syncmgr/config.go b/pkg/syncmgr/config.go index af27f37c6..713059802 100644 --- a/pkg/syncmgr/config.go +++ b/pkg/syncmgr/config.go @@ -18,7 +18,7 @@ type Config struct { //RequestBlock will send a getdata request for the block // with the hash passed as a parameter - RequestBlock func(hash util.Uint256) error + RequestBlock func(hash util.Uint256, index uint32) error // GetNextBlockHash returns the block hash of the header infront of thr block // at the tip of this nodes chain. This assumes that the node is not in sync diff --git a/pkg/syncmgr/headermode.go b/pkg/syncmgr/headermode.go index 898dc933d..3a8e4d681 100644 --- a/pkg/syncmgr/headermode.go +++ b/pkg/syncmgr/headermode.go @@ -12,6 +12,7 @@ func (s *Syncmgr) headersModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase) // Note: For the un-optimised version, we move straight to blocksOnly mode firstHash := hdrs[0].Hash + firstHdrIndex := hdrs[0].Index err := s.cfg.ProcessHeaders(hdrs) if err == nil { @@ -19,7 +20,7 @@ func (s *Syncmgr) headersModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase) s.headerHash = hdrs[len(hdrs)-1].Hash s.syncmode = blockMode - return s.cfg.RequestBlock(firstHash) + return s.cfg.RequestBlock(firstHash, firstHdrIndex) } // Check whether it is a validation error, or a database error diff --git a/pkg/syncmgr/mockhelpers_test.go b/pkg/syncmgr/mockhelpers_test.go index 8c84225ac..157fdd513 100644 --- a/pkg/syncmgr/mockhelpers_test.go +++ b/pkg/syncmgr/mockhelpers_test.go @@ -46,7 +46,7 @@ func (s *syncTestHelper) FetchBlockAgain(util.Uint256) error { return s.err } -func (s *syncTestHelper) RequestBlock(util.Uint256) error { +func (s *syncTestHelper) RequestBlock(util.Uint256, uint32) error { s.blockFetchRequest++ return s.err } diff --git a/pkg/syncmgr/normalmode.go b/pkg/syncmgr/normalmode.go index 218d6151b..10bbac365 100644 --- a/pkg/syncmgr/normalmode.go +++ b/pkg/syncmgr/normalmode.go @@ -16,6 +16,7 @@ func (s *Syncmgr) normalModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase) lenHeaders := len(hdrs) firstHash := hdrs[0].Hash + firstHdrIndex := hdrs[0].Index lastHash := hdrs[lenHeaders-1].Hash // Update syncmgr latest header @@ -32,7 +33,7 @@ func (s *Syncmgr) normalModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase) // Bounds state that len > 1 && len!= 2000 & maxHeadersInMessage == 2000 // This means that we have less than 2k headers s.syncmode = blockMode - return s.cfg.RequestBlock(firstHash) + return s.cfg.RequestBlock(firstHash, firstHdrIndex) } // normalModeOnBlock is called when the sync manager is normal mode