diff --git a/pkg/server/connmgr.go b/pkg/server/connmgr.go index 64b16bdaf..e97416801 100644 --- a/pkg/server/connmgr.go +++ b/pkg/server/connmgr.go @@ -1,7 +1,6 @@ package server import ( - "encoding/hex" "fmt" "net" "strconv" @@ -9,7 +8,6 @@ import ( "github.com/CityOfZion/neo-go/pkg/connmgr" "github.com/CityOfZion/neo-go/pkg/peer" - "github.com/CityOfZion/neo-go/pkg/wire/util" iputils "github.com/CityOfZion/neo-go/pkg/wire/util/ip" ) @@ -34,19 +32,6 @@ func (s *Server) onConnection(conn net.Conn, addr string) { } s.pmg.AddPeer(p) - - byt, err := hex.DecodeString("d42561e3d30e15be6400b6df2f328e02d2bf6354c41dce433bc57687c82144bf") - if err != nil { - fmt.Println("Error getting hash " + err.Error()) - } - lh, err := util.Uint256DecodeBytes(byt) - if err != nil { - fmt.Println("Error getting hash " + err.Error()) - } - err = p.RequestHeaders(lh.Reverse()) - if err != nil { - fmt.Println(err) - } } func (s *Server) onAccept(conn net.Conn) { diff --git a/pkg/server/server.go b/pkg/server/server.go index b1e4c0f5b..22a781170 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -56,7 +56,10 @@ func New(net protocol.Magic, port uint16) (*Server, error) { s.chain = chain // Setup sync manager - syncmgr := setupSyncManager(s) + syncmgr, err := setupSyncManager(s) + if err != nil { + return nil, err + } s.smg = syncmgr // Setup connection manager @@ -92,7 +95,8 @@ func (s *Server) Run() error { if err != nil { return err } - err = s.pmg.RequestHeaders(bestHeader.Hash.Reverse()) + + err = s.pmg.RequestHeaders(bestHeader.Hash) if err != nil { return err } diff --git a/pkg/server/syncmgr.go b/pkg/server/syncmgr.go index 7de87188b..e6020d5b9 100644 --- a/pkg/server/syncmgr.go +++ b/pkg/server/syncmgr.go @@ -11,7 +11,7 @@ import ( "github.com/CityOfZion/neo-go/pkg/wire/util" ) -func setupSyncManager(s *Server) *syncmgr.Syncmgr { +func setupSyncManager(s *Server) (*syncmgr.Syncmgr, error) { cfg := &syncmgr.Config{ ProcessBlock: s.processBlock, @@ -27,7 +27,15 @@ func setupSyncManager(s *Server) *syncmgr.Syncmgr { FetchBlockAgain: s.fetchBlockAgain, } - return syncmgr.New(cfg) + // Add nextBlockIndex in syncmgr + lastBlock, err := s.chain.Db.GetLastBlock() + if err != nil { + return nil, err + } + + nextBlockIndex := lastBlock.Index + 1 + + return syncmgr.New(cfg, nextBlockIndex), nil } func (s *Server) onHeader(peer *peer.Peer, hdrsMessage *payload.HeadersMessage) { diff --git a/pkg/syncmgr/blockmode.go b/pkg/syncmgr/blockmode.go index 83d3acd50..f357ca642 100644 --- a/pkg/syncmgr/blockmode.go +++ b/pkg/syncmgr/blockmode.go @@ -9,22 +9,29 @@ import ( // and receives a block. func (s *Syncmgr) blockModeOnBlock(peer SyncPeer, block payload.Block) error { - // Process Block - err := s.cfg.ProcessBlock(block) - - if err == chain.ErrFutureBlock { - // XXX(Optimisation): We can cache future blocks in blockmode, if we have the corresponding header - // We can have the server cache them and sort out the semantics for when to send them to the chain - // Server can listen on chain for when a new block is saved - // or we could embed a struct in this syncmgr called blockCache, syncmgr can just tell it when it has processed - //a block and we can call ProcessBlock - return err + // Check if it is a future block + // XXX: since we are storing blocks in memory, we do not want to store blocks + // from the tip + if block.Index > s.nextBlockIndex+2000 { + return nil + } + if block.Index > s.nextBlockIndex { + s.addToBlockPool(block) + return nil } + // Process Block + err := s.processBlock(block) if err != nil && err != chain.ErrBlockAlreadyExists { return s.cfg.FetchBlockAgain(block.Hash) } + // Check the block pool + err = s.checkPool() + if err != nil { + return err + } + // Check if blockhashReceived == the header hash from last get headers this node performed // if not then increment and request next block if s.headerHash != block.Hash { @@ -32,8 +39,7 @@ func (s *Syncmgr) blockModeOnBlock(peer SyncPeer, block payload.Block) error { if err != nil { return err } - err = s.cfg.RequestBlock(nextHash, block.Index) - return err + return s.cfg.RequestBlock(nextHash, block.Index) } // If we are caught up then go into normal mode diff --git a/pkg/syncmgr/blockpool.go b/pkg/syncmgr/blockpool.go new file mode 100644 index 000000000..2b1f37761 --- /dev/null +++ b/pkg/syncmgr/blockpool.go @@ -0,0 +1,57 @@ +package syncmgr + +import ( + "sort" + + "github.com/CityOfZion/neo-go/pkg/wire/payload" +) + +func (s *Syncmgr) addToBlockPool(newBlock payload.Block) { + s.poolLock.Lock() + defer s.poolLock.Unlock() + + for _, block := range s.blockPool { + if block.Index == newBlock.Index { + return + } + } + + s.blockPool = append(s.blockPool, newBlock) + + // sort slice using block index + sort.Slice(s.blockPool, func(i, j int) bool { + return s.blockPool[i].Index < s.blockPool[j].Index + }) + +} + +func (s *Syncmgr) checkPool() error { + // Assuming that the blocks are sorted in order + + var indexesToRemove = -1 + + s.poolLock.Lock() + defer func() { + // removes all elements before this index, including the element at this index + s.blockPool = s.blockPool[indexesToRemove+1:] + s.poolLock.Unlock() + }() + + // loop iterates through the cache, processing any + // blocks that can be added to the chain + for i, block := range s.blockPool { + if s.nextBlockIndex != block.Index { + break + } + + // Save this block and save the indice location so we can remove it, when we defer + err := s.processBlock(block) + if err != nil { + return err + } + + indexesToRemove = i + } + + return nil +} diff --git a/pkg/syncmgr/blockpool_test.go b/pkg/syncmgr/blockpool_test.go new file mode 100644 index 000000000..c236afc2c --- /dev/null +++ b/pkg/syncmgr/blockpool_test.go @@ -0,0 +1,42 @@ +package syncmgr + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAddBlockPoolFlush(t *testing.T) { + syncmgr, _ := setupSyncMgr(blockMode, 10) + + blockMessage := randomBlockMessage(t, 11) + + peer := &mockPeer{ + height: 100, + } + + // Since the block has Index 11 and the sync manager needs the block with index 10 + // This block will be added to the blockPool + err := syncmgr.OnBlock(peer, blockMessage) + assert.Nil(t, err) + assert.Equal(t, 1, len(syncmgr.blockPool)) + + // The sync manager is still looking for the block at height 10 + // Since this block is at height 12, it will be added to the block pool + blockMessage = randomBlockMessage(t, 12) + err = syncmgr.OnBlock(peer, blockMessage) + assert.Nil(t, err) + assert.Equal(t, 2, len(syncmgr.blockPool)) + + // This is the block that the sync manager was waiting for + // It should process this block, the check the pool for the next set of blocks + blockMessage = randomBlockMessage(t, 10) + err = syncmgr.OnBlock(peer, blockMessage) + assert.Nil(t, err) + assert.Equal(t, 0, len(syncmgr.blockPool)) + + // Since we processed 3 blocks and the sync manager started + //looking for block with index 10. The syncmananger should be looking for + // the block with index 13 + assert.Equal(t, uint32(13), syncmgr.nextBlockIndex) +} diff --git a/pkg/syncmgr/config.go b/pkg/syncmgr/config.go index 713059802..43387d414 100644 --- a/pkg/syncmgr/config.go +++ b/pkg/syncmgr/config.go @@ -24,9 +24,6 @@ type Config struct { // at the tip of this nodes chain. This assumes that the node is not in sync GetNextBlockHash func() (util.Uint256, error) - // GetBestBlockHash gets the block hash of the last saved block. - GetBestBlockHash func() (util.Uint256, error) - // AskForNewBlocks will send out a message to the network // asking for new blocks AskForNewBlocks func() diff --git a/pkg/syncmgr/mockhelpers_test.go b/pkg/syncmgr/mockhelpers_test.go index 157fdd513..d95e95e6a 100644 --- a/pkg/syncmgr/mockhelpers_test.go +++ b/pkg/syncmgr/mockhelpers_test.go @@ -89,7 +89,7 @@ func randomUint256(t *testing.T) util.Uint256 { return u } -func setupSyncMgr(mode mode) (*Syncmgr, *syncTestHelper) { +func setupSyncMgr(mode mode, nextBlockIndex uint32) (*Syncmgr, *syncTestHelper) { helper := &syncTestHelper{} cfg := &Config{ @@ -106,7 +106,7 @@ func setupSyncMgr(mode mode) (*Syncmgr, *syncTestHelper) { RequestHeaders: helper.RequestHeaders, } - syncmgr := New(cfg) + syncmgr := New(cfg, nextBlockIndex) syncmgr.syncmode = mode return syncmgr, helper diff --git a/pkg/syncmgr/normalmode.go b/pkg/syncmgr/normalmode.go index 10bbac365..ad22e52f2 100644 --- a/pkg/syncmgr/normalmode.go +++ b/pkg/syncmgr/normalmode.go @@ -43,7 +43,7 @@ func (s *Syncmgr) normalModeOnBlock(peer SyncPeer, block payload.Block) error { s.timer.Stop() // process block - err := s.cfg.ProcessBlock(block) + err := s.processBlock(block) if err != nil { s.timer.Reset(blockTimer) return s.cfg.FetchBlockAgain(block.Hash) diff --git a/pkg/syncmgr/syncmgr.go b/pkg/syncmgr/syncmgr.go index bb06dfa7e..dc639a8d3 100644 --- a/pkg/syncmgr/syncmgr.go +++ b/pkg/syncmgr/syncmgr.go @@ -2,6 +2,7 @@ package syncmgr import ( "fmt" + "sync" "time" "github.com/CityOfZion/neo-go/pkg/wire/payload" @@ -50,10 +51,14 @@ type Syncmgr struct { // When receiving blocks, we can use this to determine whether the node has downloaded // all of the blocks for the last headers messages headerHash util.Uint256 + + poolLock sync.Mutex + blockPool []payload.Block + nextBlockIndex uint32 } // New creates a new sync manager -func New(cfg *Config) *Syncmgr { +func New(cfg *Config, nextBlockIndex uint32) *Syncmgr { newBlockTimer := time.AfterFunc(blockTimer, func() { cfg.AskForNewBlocks() @@ -61,9 +66,10 @@ func New(cfg *Config) *Syncmgr { newBlockTimer.Stop() return &Syncmgr{ - syncmode: headersMode, - cfg: cfg, - timer: newBlockTimer, + syncmode: headersMode, + cfg: cfg, + timer: newBlockTimer, + nextBlockIndex: nextBlockIndex, } } @@ -133,3 +139,14 @@ func (s *Syncmgr) OnBlock(peer SyncPeer, msg *payload.BlockMessage) error { func (s *Syncmgr) IsCurrent() bool { return s.syncmode == normalMode } + +func (s *Syncmgr) processBlock(block payload.Block) error { + err := s.cfg.ProcessBlock(block) + if err != nil { + return err + } + + s.nextBlockIndex++ + + return nil +} diff --git a/pkg/syncmgr/syncmgr_onblock_test.go b/pkg/syncmgr/syncmgr_onblock_test.go index d5b79f0b3..d08aa8a74 100644 --- a/pkg/syncmgr/syncmgr_onblock_test.go +++ b/pkg/syncmgr/syncmgr_onblock_test.go @@ -11,7 +11,7 @@ import ( func TestHeadersModeOnBlock(t *testing.T) { - syncmgr, helper := setupSyncMgr(headersMode) + syncmgr, helper := setupSyncMgr(headersMode, 0) syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0)) @@ -21,7 +21,7 @@ func TestHeadersModeOnBlock(t *testing.T) { func TestBlockModeOnBlock(t *testing.T) { - syncmgr, helper := setupSyncMgr(blockMode) + syncmgr, helper := setupSyncMgr(blockMode, 0) syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0)) @@ -30,7 +30,7 @@ func TestBlockModeOnBlock(t *testing.T) { } func TestNormalModeOnBlock(t *testing.T) { - syncmgr, helper := setupSyncMgr(normalMode) + syncmgr, helper := setupSyncMgr(normalMode, 0) syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0)) @@ -40,7 +40,7 @@ func TestNormalModeOnBlock(t *testing.T) { func TestBlockModeToNormalMode(t *testing.T) { - syncmgr, _ := setupSyncMgr(blockMode) + syncmgr, _ := setupSyncMgr(blockMode, 100) peer := &mockPeer{ height: 100, @@ -57,7 +57,7 @@ func TestBlockModeToNormalMode(t *testing.T) { } func TestBlockModeStayInBlockMode(t *testing.T) { - syncmgr, _ := setupSyncMgr(blockMode) + syncmgr, _ := setupSyncMgr(blockMode, 0) // We need our latest know hash to not be equal to the hash // of the block we received, to stay in blockmode @@ -77,7 +77,7 @@ func TestBlockModeStayInBlockMode(t *testing.T) { } func TestBlockModeAlreadyExistsErr(t *testing.T) { - syncmgr, helper := setupSyncMgr(blockMode) + syncmgr, helper := setupSyncMgr(blockMode, 100) helper.err = chain.ErrBlockAlreadyExists syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 100)) diff --git a/pkg/syncmgr/syncmgr_onheaders_test.go b/pkg/syncmgr/syncmgr_onheaders_test.go index 2f60a4b72..f2bcef3f3 100644 --- a/pkg/syncmgr/syncmgr_onheaders_test.go +++ b/pkg/syncmgr/syncmgr_onheaders_test.go @@ -12,7 +12,7 @@ import ( func TestHeadersModeOnHeaders(t *testing.T) { - syncmgr, helper := setupSyncMgr(headersMode) + syncmgr, helper := setupSyncMgr(headersMode, 0) syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 0)) @@ -29,14 +29,14 @@ func TestHeadersModeOnHeaders(t *testing.T) { } func TestBlockModeOnHeaders(t *testing.T) { - syncmgr, helper := setupSyncMgr(blockMode) + syncmgr, helper := setupSyncMgr(blockMode, 0) // If we receive a header in blockmode, no headers will be processed syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 100)) assert.Equal(t, 0, helper.headersProcessed) } func TestNormalModeOnHeadersMaxHeaders(t *testing.T) { - syncmgr, helper := setupSyncMgr(normalMode) + syncmgr, helper := setupSyncMgr(normalMode, 0) // If we receive a header in normalmode, headers will be processed syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 2000)) @@ -49,7 +49,7 @@ func TestNormalModeOnHeadersMaxHeaders(t *testing.T) { // This differs from the previous function in that //we did not receive the max amount of headers func TestNormalModeOnHeaders(t *testing.T) { - syncmgr, helper := setupSyncMgr(normalMode) + syncmgr, helper := setupSyncMgr(normalMode, 0) // If we receive a header in normalmode, headers will be processed syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200)) @@ -60,7 +60,7 @@ func TestNormalModeOnHeaders(t *testing.T) { } func TestLastHeaderUpdates(t *testing.T) { - syncmgr, _ := setupSyncMgr(headersMode) + syncmgr, _ := setupSyncMgr(headersMode, 0) hdrsMessage := randomHeadersMessage(t, 200) hdrs := hdrsMessage.Headers @@ -95,7 +95,7 @@ func TestLastHeaderUpdates(t *testing.T) { func TestHeadersModeOnHeadersErr(t *testing.T) { - syncmgr, helper := setupSyncMgr(headersMode) + syncmgr, helper := setupSyncMgr(headersMode, 0) helper.err = &chain.ValidationError{} syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200)) @@ -106,7 +106,7 @@ func TestHeadersModeOnHeadersErr(t *testing.T) { } func TestNormalModeOnHeadersErr(t *testing.T) { - syncmgr, helper := setupSyncMgr(normalMode) + syncmgr, helper := setupSyncMgr(normalMode, 0) helper.err = &chain.ValidationError{} syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200))