Merge pull request #264 from nspcc-dev/blockPool

syncmgr : Add blockpool to syncmgr
This commit is contained in:
Roman Khimov 2019-08-12 12:02:09 +03:00 committed by GitHub
commit 03629a31c7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 170 additions and 54 deletions

View file

@ -1,7 +1,6 @@
package server package server
import ( import (
"encoding/hex"
"fmt" "fmt"
"net" "net"
"strconv" "strconv"
@ -9,7 +8,6 @@ import (
"github.com/CityOfZion/neo-go/pkg/connmgr" "github.com/CityOfZion/neo-go/pkg/connmgr"
"github.com/CityOfZion/neo-go/pkg/peer" "github.com/CityOfZion/neo-go/pkg/peer"
"github.com/CityOfZion/neo-go/pkg/wire/util"
iputils "github.com/CityOfZion/neo-go/pkg/wire/util/ip" iputils "github.com/CityOfZion/neo-go/pkg/wire/util/ip"
) )
@ -34,19 +32,6 @@ func (s *Server) onConnection(conn net.Conn, addr string) {
} }
s.pmg.AddPeer(p) s.pmg.AddPeer(p)
byt, err := hex.DecodeString("d42561e3d30e15be6400b6df2f328e02d2bf6354c41dce433bc57687c82144bf")
if err != nil {
fmt.Println("Error getting hash " + err.Error())
}
lh, err := util.Uint256DecodeBytes(byt)
if err != nil {
fmt.Println("Error getting hash " + err.Error())
}
err = p.RequestHeaders(lh.Reverse())
if err != nil {
fmt.Println(err)
}
} }
func (s *Server) onAccept(conn net.Conn) { func (s *Server) onAccept(conn net.Conn) {

View file

@ -56,7 +56,10 @@ func New(net protocol.Magic, port uint16) (*Server, error) {
s.chain = chain s.chain = chain
// Setup sync manager // Setup sync manager
syncmgr := setupSyncManager(s) syncmgr, err := setupSyncManager(s)
if err != nil {
return nil, err
}
s.smg = syncmgr s.smg = syncmgr
// Setup connection manager // Setup connection manager
@ -92,7 +95,8 @@ func (s *Server) Run() error {
if err != nil { if err != nil {
return err return err
} }
err = s.pmg.RequestHeaders(bestHeader.Hash.Reverse())
err = s.pmg.RequestHeaders(bestHeader.Hash)
if err != nil { if err != nil {
return err return err
} }

View file

@ -11,7 +11,7 @@ import (
"github.com/CityOfZion/neo-go/pkg/wire/util" "github.com/CityOfZion/neo-go/pkg/wire/util"
) )
func setupSyncManager(s *Server) *syncmgr.Syncmgr { func setupSyncManager(s *Server) (*syncmgr.Syncmgr, error) {
cfg := &syncmgr.Config{ cfg := &syncmgr.Config{
ProcessBlock: s.processBlock, ProcessBlock: s.processBlock,
@ -27,7 +27,15 @@ func setupSyncManager(s *Server) *syncmgr.Syncmgr {
FetchBlockAgain: s.fetchBlockAgain, FetchBlockAgain: s.fetchBlockAgain,
} }
return syncmgr.New(cfg) // Add nextBlockIndex in syncmgr
lastBlock, err := s.chain.Db.GetLastBlock()
if err != nil {
return nil, err
}
nextBlockIndex := lastBlock.Index + 1
return syncmgr.New(cfg, nextBlockIndex), nil
} }
func (s *Server) onHeader(peer *peer.Peer, hdrsMessage *payload.HeadersMessage) { func (s *Server) onHeader(peer *peer.Peer, hdrsMessage *payload.HeadersMessage) {

View file

@ -9,22 +9,29 @@ import (
// and receives a block. // and receives a block.
func (s *Syncmgr) blockModeOnBlock(peer SyncPeer, block payload.Block) error { func (s *Syncmgr) blockModeOnBlock(peer SyncPeer, block payload.Block) error {
// Process Block // Check if it is a future block
err := s.cfg.ProcessBlock(block) // XXX: since we are storing blocks in memory, we do not want to store blocks
// from the tip
if err == chain.ErrFutureBlock { if block.Index > s.nextBlockIndex+2000 {
// XXX(Optimisation): We can cache future blocks in blockmode, if we have the corresponding header return nil
// We can have the server cache them and sort out the semantics for when to send them to the chain }
// Server can listen on chain for when a new block is saved if block.Index > s.nextBlockIndex {
// or we could embed a struct in this syncmgr called blockCache, syncmgr can just tell it when it has processed s.addToBlockPool(block)
//a block and we can call ProcessBlock return nil
return err
} }
// Process Block
err := s.processBlock(block)
if err != nil && err != chain.ErrBlockAlreadyExists { if err != nil && err != chain.ErrBlockAlreadyExists {
return s.cfg.FetchBlockAgain(block.Hash) return s.cfg.FetchBlockAgain(block.Hash)
} }
// Check the block pool
err = s.checkPool()
if err != nil {
return err
}
// Check if blockhashReceived == the header hash from last get headers this node performed // Check if blockhashReceived == the header hash from last get headers this node performed
// if not then increment and request next block // if not then increment and request next block
if s.headerHash != block.Hash { if s.headerHash != block.Hash {
@ -32,8 +39,7 @@ func (s *Syncmgr) blockModeOnBlock(peer SyncPeer, block payload.Block) error {
if err != nil { if err != nil {
return err return err
} }
err = s.cfg.RequestBlock(nextHash, block.Index) return s.cfg.RequestBlock(nextHash, block.Index)
return err
} }
// If we are caught up then go into normal mode // If we are caught up then go into normal mode

57
pkg/syncmgr/blockpool.go Normal file
View file

@ -0,0 +1,57 @@
package syncmgr
import (
"sort"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
)
func (s *Syncmgr) addToBlockPool(newBlock payload.Block) {
s.poolLock.Lock()
defer s.poolLock.Unlock()
for _, block := range s.blockPool {
if block.Index == newBlock.Index {
return
}
}
s.blockPool = append(s.blockPool, newBlock)
// sort slice using block index
sort.Slice(s.blockPool, func(i, j int) bool {
return s.blockPool[i].Index < s.blockPool[j].Index
})
}
func (s *Syncmgr) checkPool() error {
// Assuming that the blocks are sorted in order
var indexesToRemove = -1
s.poolLock.Lock()
defer func() {
// removes all elements before this index, including the element at this index
s.blockPool = s.blockPool[indexesToRemove+1:]
s.poolLock.Unlock()
}()
// loop iterates through the cache, processing any
// blocks that can be added to the chain
for i, block := range s.blockPool {
if s.nextBlockIndex != block.Index {
break
}
// Save this block and save the indice location so we can remove it, when we defer
err := s.processBlock(block)
if err != nil {
return err
}
indexesToRemove = i
}
return nil
}

View file

@ -0,0 +1,42 @@
package syncmgr
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestAddBlockPoolFlush(t *testing.T) {
syncmgr, _ := setupSyncMgr(blockMode, 10)
blockMessage := randomBlockMessage(t, 11)
peer := &mockPeer{
height: 100,
}
// Since the block has Index 11 and the sync manager needs the block with index 10
// This block will be added to the blockPool
err := syncmgr.OnBlock(peer, blockMessage)
assert.Nil(t, err)
assert.Equal(t, 1, len(syncmgr.blockPool))
// The sync manager is still looking for the block at height 10
// Since this block is at height 12, it will be added to the block pool
blockMessage = randomBlockMessage(t, 12)
err = syncmgr.OnBlock(peer, blockMessage)
assert.Nil(t, err)
assert.Equal(t, 2, len(syncmgr.blockPool))
// This is the block that the sync manager was waiting for
// It should process this block, the check the pool for the next set of blocks
blockMessage = randomBlockMessage(t, 10)
err = syncmgr.OnBlock(peer, blockMessage)
assert.Nil(t, err)
assert.Equal(t, 0, len(syncmgr.blockPool))
// Since we processed 3 blocks and the sync manager started
//looking for block with index 10. The syncmananger should be looking for
// the block with index 13
assert.Equal(t, uint32(13), syncmgr.nextBlockIndex)
}

View file

@ -24,9 +24,6 @@ type Config struct {
// at the tip of this nodes chain. This assumes that the node is not in sync // at the tip of this nodes chain. This assumes that the node is not in sync
GetNextBlockHash func() (util.Uint256, error) GetNextBlockHash func() (util.Uint256, error)
// GetBestBlockHash gets the block hash of the last saved block.
GetBestBlockHash func() (util.Uint256, error)
// AskForNewBlocks will send out a message to the network // AskForNewBlocks will send out a message to the network
// asking for new blocks // asking for new blocks
AskForNewBlocks func() AskForNewBlocks func()

View file

@ -89,7 +89,7 @@ func randomUint256(t *testing.T) util.Uint256 {
return u return u
} }
func setupSyncMgr(mode mode) (*Syncmgr, *syncTestHelper) { func setupSyncMgr(mode mode, nextBlockIndex uint32) (*Syncmgr, *syncTestHelper) {
helper := &syncTestHelper{} helper := &syncTestHelper{}
cfg := &Config{ cfg := &Config{
@ -106,7 +106,7 @@ func setupSyncMgr(mode mode) (*Syncmgr, *syncTestHelper) {
RequestHeaders: helper.RequestHeaders, RequestHeaders: helper.RequestHeaders,
} }
syncmgr := New(cfg) syncmgr := New(cfg, nextBlockIndex)
syncmgr.syncmode = mode syncmgr.syncmode = mode
return syncmgr, helper return syncmgr, helper

View file

@ -43,7 +43,7 @@ func (s *Syncmgr) normalModeOnBlock(peer SyncPeer, block payload.Block) error {
s.timer.Stop() s.timer.Stop()
// process block // process block
err := s.cfg.ProcessBlock(block) err := s.processBlock(block)
if err != nil { if err != nil {
s.timer.Reset(blockTimer) s.timer.Reset(blockTimer)
return s.cfg.FetchBlockAgain(block.Hash) return s.cfg.FetchBlockAgain(block.Hash)

View file

@ -2,6 +2,7 @@ package syncmgr
import ( import (
"fmt" "fmt"
"sync"
"time" "time"
"github.com/CityOfZion/neo-go/pkg/wire/payload" "github.com/CityOfZion/neo-go/pkg/wire/payload"
@ -50,10 +51,14 @@ type Syncmgr struct {
// When receiving blocks, we can use this to determine whether the node has downloaded // When receiving blocks, we can use this to determine whether the node has downloaded
// all of the blocks for the last headers messages // all of the blocks for the last headers messages
headerHash util.Uint256 headerHash util.Uint256
poolLock sync.Mutex
blockPool []payload.Block
nextBlockIndex uint32
} }
// New creates a new sync manager // New creates a new sync manager
func New(cfg *Config) *Syncmgr { func New(cfg *Config, nextBlockIndex uint32) *Syncmgr {
newBlockTimer := time.AfterFunc(blockTimer, func() { newBlockTimer := time.AfterFunc(blockTimer, func() {
cfg.AskForNewBlocks() cfg.AskForNewBlocks()
@ -61,9 +66,10 @@ func New(cfg *Config) *Syncmgr {
newBlockTimer.Stop() newBlockTimer.Stop()
return &Syncmgr{ return &Syncmgr{
syncmode: headersMode, syncmode: headersMode,
cfg: cfg, cfg: cfg,
timer: newBlockTimer, timer: newBlockTimer,
nextBlockIndex: nextBlockIndex,
} }
} }
@ -133,3 +139,14 @@ func (s *Syncmgr) OnBlock(peer SyncPeer, msg *payload.BlockMessage) error {
func (s *Syncmgr) IsCurrent() bool { func (s *Syncmgr) IsCurrent() bool {
return s.syncmode == normalMode return s.syncmode == normalMode
} }
func (s *Syncmgr) processBlock(block payload.Block) error {
err := s.cfg.ProcessBlock(block)
if err != nil {
return err
}
s.nextBlockIndex++
return nil
}

View file

@ -11,7 +11,7 @@ import (
func TestHeadersModeOnBlock(t *testing.T) { func TestHeadersModeOnBlock(t *testing.T) {
syncmgr, helper := setupSyncMgr(headersMode) syncmgr, helper := setupSyncMgr(headersMode, 0)
syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0)) syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0))
@ -21,7 +21,7 @@ func TestHeadersModeOnBlock(t *testing.T) {
func TestBlockModeOnBlock(t *testing.T) { func TestBlockModeOnBlock(t *testing.T) {
syncmgr, helper := setupSyncMgr(blockMode) syncmgr, helper := setupSyncMgr(blockMode, 0)
syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0)) syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0))
@ -30,7 +30,7 @@ func TestBlockModeOnBlock(t *testing.T) {
} }
func TestNormalModeOnBlock(t *testing.T) { func TestNormalModeOnBlock(t *testing.T) {
syncmgr, helper := setupSyncMgr(normalMode) syncmgr, helper := setupSyncMgr(normalMode, 0)
syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0)) syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0))
@ -40,7 +40,7 @@ func TestNormalModeOnBlock(t *testing.T) {
func TestBlockModeToNormalMode(t *testing.T) { func TestBlockModeToNormalMode(t *testing.T) {
syncmgr, _ := setupSyncMgr(blockMode) syncmgr, _ := setupSyncMgr(blockMode, 100)
peer := &mockPeer{ peer := &mockPeer{
height: 100, height: 100,
@ -57,7 +57,7 @@ func TestBlockModeToNormalMode(t *testing.T) {
} }
func TestBlockModeStayInBlockMode(t *testing.T) { func TestBlockModeStayInBlockMode(t *testing.T) {
syncmgr, _ := setupSyncMgr(blockMode) syncmgr, _ := setupSyncMgr(blockMode, 0)
// We need our latest know hash to not be equal to the hash // We need our latest know hash to not be equal to the hash
// of the block we received, to stay in blockmode // of the block we received, to stay in blockmode
@ -77,7 +77,7 @@ func TestBlockModeStayInBlockMode(t *testing.T) {
} }
func TestBlockModeAlreadyExistsErr(t *testing.T) { func TestBlockModeAlreadyExistsErr(t *testing.T) {
syncmgr, helper := setupSyncMgr(blockMode) syncmgr, helper := setupSyncMgr(blockMode, 100)
helper.err = chain.ErrBlockAlreadyExists helper.err = chain.ErrBlockAlreadyExists
syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 100)) syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 100))

View file

@ -12,7 +12,7 @@ import (
func TestHeadersModeOnHeaders(t *testing.T) { func TestHeadersModeOnHeaders(t *testing.T) {
syncmgr, helper := setupSyncMgr(headersMode) syncmgr, helper := setupSyncMgr(headersMode, 0)
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 0)) syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 0))
@ -29,14 +29,14 @@ func TestHeadersModeOnHeaders(t *testing.T) {
} }
func TestBlockModeOnHeaders(t *testing.T) { func TestBlockModeOnHeaders(t *testing.T) {
syncmgr, helper := setupSyncMgr(blockMode) syncmgr, helper := setupSyncMgr(blockMode, 0)
// If we receive a header in blockmode, no headers will be processed // If we receive a header in blockmode, no headers will be processed
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 100)) syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 100))
assert.Equal(t, 0, helper.headersProcessed) assert.Equal(t, 0, helper.headersProcessed)
} }
func TestNormalModeOnHeadersMaxHeaders(t *testing.T) { func TestNormalModeOnHeadersMaxHeaders(t *testing.T) {
syncmgr, helper := setupSyncMgr(normalMode) syncmgr, helper := setupSyncMgr(normalMode, 0)
// If we receive a header in normalmode, headers will be processed // If we receive a header in normalmode, headers will be processed
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 2000)) syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 2000))
@ -49,7 +49,7 @@ func TestNormalModeOnHeadersMaxHeaders(t *testing.T) {
// This differs from the previous function in that // This differs from the previous function in that
//we did not receive the max amount of headers //we did not receive the max amount of headers
func TestNormalModeOnHeaders(t *testing.T) { func TestNormalModeOnHeaders(t *testing.T) {
syncmgr, helper := setupSyncMgr(normalMode) syncmgr, helper := setupSyncMgr(normalMode, 0)
// If we receive a header in normalmode, headers will be processed // If we receive a header in normalmode, headers will be processed
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200)) syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200))
@ -60,7 +60,7 @@ func TestNormalModeOnHeaders(t *testing.T) {
} }
func TestLastHeaderUpdates(t *testing.T) { func TestLastHeaderUpdates(t *testing.T) {
syncmgr, _ := setupSyncMgr(headersMode) syncmgr, _ := setupSyncMgr(headersMode, 0)
hdrsMessage := randomHeadersMessage(t, 200) hdrsMessage := randomHeadersMessage(t, 200)
hdrs := hdrsMessage.Headers hdrs := hdrsMessage.Headers
@ -95,7 +95,7 @@ func TestLastHeaderUpdates(t *testing.T) {
func TestHeadersModeOnHeadersErr(t *testing.T) { func TestHeadersModeOnHeadersErr(t *testing.T) {
syncmgr, helper := setupSyncMgr(headersMode) syncmgr, helper := setupSyncMgr(headersMode, 0)
helper.err = &chain.ValidationError{} helper.err = &chain.ValidationError{}
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200)) syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200))
@ -106,7 +106,7 @@ func TestHeadersModeOnHeadersErr(t *testing.T) {
} }
func TestNormalModeOnHeadersErr(t *testing.T) { func TestNormalModeOnHeadersErr(t *testing.T) {
syncmgr, helper := setupSyncMgr(normalMode) syncmgr, helper := setupSyncMgr(normalMode, 0)
helper.err = &chain.ValidationError{} helper.err = &chain.ValidationError{}
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200)) syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200))