diff --git a/pkg/syncmgr/blockmode.go b/pkg/syncmgr/blockmode.go new file mode 100644 index 000000000..2e5dadd6f --- /dev/null +++ b/pkg/syncmgr/blockmode.go @@ -0,0 +1,55 @@ +package syncmgr + +import ( + "github.com/CityOfZion/neo-go/pkg/chain" + "github.com/CityOfZion/neo-go/pkg/wire/payload" +) + +// blockModeOnBlock is called when the sync manager is block mode +// and receives a block. +func (s *Syncmgr) blockModeOnBlock(peer SyncPeer, block payload.Block) error { + + // Process Block + err := s.cfg.ProcessBlock(block) + + if err == chain.ErrFutureBlock { + // XXX(Optimisation): We can cache future blocks in blockmode, if we have the corresponding header + // We can have the server cache them and sort out the semantics for when to send them to the chain + // Server can listen on chain for when a new block is saved + // or we could embed a struct in this syncmgr called blockCache, syncmgr can just tell it when it has processed + //a block and we can call ProcessBlock + return err + } + + if err != nil && err != chain.ErrBlockAlreadyExists { + return s.cfg.FetchBlockAgain(block.Hash) + } + + // Check if blockhashReceived == the header hash from last get headers this node performed + // if not then increment and request next block + if s.headerHash != block.Hash { + nextHash, err := s.cfg.GetNextBlockHash() + if err != nil { + return err + } + err = s.cfg.RequestBlock(nextHash) + return err + } + + // If we are caught up then go into normal mode + diff := peer.Height() - block.Index + if diff <= cruiseHeight { + s.syncmode = normalMode + s.timer.Reset(blockTimer) + return nil + } + + // If not then we go back into headersMode and request more headers. + s.syncmode = headersMode + return s.cfg.RequestHeaders(block.Hash) +} + +func (s *Syncmgr) blockModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase) error { + // We ignore headers when in this mode + return nil +} diff --git a/pkg/syncmgr/config.go b/pkg/syncmgr/config.go new file mode 100644 index 000000000..bc921b33b --- /dev/null +++ b/pkg/syncmgr/config.go @@ -0,0 +1,47 @@ +package syncmgr + +import ( + "github.com/CityOfZion/neo-go/pkg/wire/payload" + "github.com/CityOfZion/neo-go/pkg/wire/util" +) + +// Config is the configuration file for the sync manager +type Config struct { + + // Chain functions + ProcessBlock func(msg payload.Block) error + ProcessHeaders func(hdrs []*payload.BlockBase) error + + // RequestHeaders will send a getHeaders request + // with the hash passed in as a parameter + RequestHeaders func(hash util.Uint256) error + + //RequestBlock will send a getdata request for the block + // with the hash passed as a parameter + RequestBlock func(hash util.Uint256) error + + // GetNextBlockHash returns the block hash of the header infront of thr block + // at the tip of this nodes chain. This assumes that the node is not in sync + GetNextBlockHash func() (util.Uint256, error) + + // GetBestBlockHash gets the block hash of the last saved block. + GetBestBlockHash func() (util.Uint256, error) + + // AskForNewBlocks will send out a message to the network + // asking for new blocks + AskForNewBlocks func() + + // FetchHeadersAgain is called when a peer has provided headers that have not + // validated properly. We pass in the hash of the first header + FetchHeadersAgain func(util.Uint256) error + + // FetchHeadersAgain is called when a peer has provided a block that has not + // validated properly. We pass in the hash of the block + FetchBlockAgain func(util.Uint256) error +} + +// SyncPeer represents a peer on the network +// that this node can sync with +type SyncPeer interface { + Height() uint32 +} diff --git a/pkg/syncmgr/headermode.go b/pkg/syncmgr/headermode.go new file mode 100644 index 000000000..898dc933d --- /dev/null +++ b/pkg/syncmgr/headermode.go @@ -0,0 +1,41 @@ +package syncmgr + +import ( + "github.com/CityOfZion/neo-go/pkg/chain" + "github.com/CityOfZion/neo-go/pkg/wire/payload" +) + +// headersModeOnHeaders is called when the sync manager is headers mode +// and receives a header. +func (s *Syncmgr) headersModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase) error { + // If we are in Headers mode, then we just need to process the headers + // Note: For the un-optimised version, we move straight to blocksOnly mode + + firstHash := hdrs[0].Hash + + err := s.cfg.ProcessHeaders(hdrs) + if err == nil { + // Update syncmgr last header + s.headerHash = hdrs[len(hdrs)-1].Hash + + s.syncmode = blockMode + return s.cfg.RequestBlock(firstHash) + } + + // Check whether it is a validation error, or a database error + if _, ok := err.(*chain.ValidationError); ok { + // If we get a validation error we re-request the headers + // the method will automatically fetch from a different peer + // XXX: Add increment banScore for this peer + return s.cfg.FetchHeadersAgain(firstHash) + } + // This means it is a database error. We have no way to recover from this. + panic(err.Error()) +} + +// headersModeOnBlock is called when the sync manager is headers mode +// and receives a block. +func (s *Syncmgr) headersModeOnBlock(peer SyncPeer, block payload.Block) error { + // While in headers mode, ignore any blocks received + return nil +} diff --git a/pkg/syncmgr/mockhelpers_test.go b/pkg/syncmgr/mockhelpers_test.go new file mode 100644 index 000000000..8c84225ac --- /dev/null +++ b/pkg/syncmgr/mockhelpers_test.go @@ -0,0 +1,113 @@ +package syncmgr + +import ( + "crypto/rand" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/CityOfZion/neo-go/pkg/wire/payload" + "github.com/CityOfZion/neo-go/pkg/wire/util" +) + +type syncTestHelper struct { + blocksProcessed int + headersProcessed int + newBlockRequest int + headersFetchRequest int + blockFetchRequest int + err error +} + +func (s *syncTestHelper) ProcessBlock(msg payload.Block) error { + s.blocksProcessed++ + return s.err +} +func (s *syncTestHelper) ProcessHeaders(hdrs []*payload.BlockBase) error { + s.headersProcessed = s.headersProcessed + len(hdrs) + return s.err +} + +func (s *syncTestHelper) GetNextBlockHash() (util.Uint256, error) { + return util.Uint256{}, s.err +} + +func (s *syncTestHelper) AskForNewBlocks() { + s.newBlockRequest++ +} + +func (s *syncTestHelper) FetchHeadersAgain(util.Uint256) error { + s.headersFetchRequest++ + return s.err +} + +func (s *syncTestHelper) FetchBlockAgain(util.Uint256) error { + s.blockFetchRequest++ + return s.err +} + +func (s *syncTestHelper) RequestBlock(util.Uint256) error { + s.blockFetchRequest++ + return s.err +} + +func (s *syncTestHelper) RequestHeaders(util.Uint256) error { + s.headersFetchRequest++ + return s.err +} + +type mockPeer struct { + height uint32 +} + +func (p *mockPeer) Height() uint32 { return p.height } + +func randomHeadersMessage(t *testing.T, num int) *payload.HeadersMessage { + var hdrs []*payload.BlockBase + + for i := 0; i < num; i++ { + hash := randomUint256(t) + hdr := &payload.BlockBase{Hash: hash} + hdrs = append(hdrs, hdr) + } + + hdrsMsg, err := payload.NewHeadersMessage() + assert.Nil(t, err) + + hdrsMsg.Headers = hdrs + + return hdrsMsg +} + +func randomUint256(t *testing.T) util.Uint256 { + hash := make([]byte, 32) + _, err := rand.Read(hash) + assert.Nil(t, err) + + u, err := util.Uint256DecodeBytes(hash) + assert.Nil(t, err) + return u +} + +func setupSyncMgr(mode mode) (*Syncmgr, *syncTestHelper) { + helper := &syncTestHelper{} + + cfg := &Config{ + ProcessBlock: helper.ProcessBlock, + ProcessHeaders: helper.ProcessHeaders, + + GetNextBlockHash: helper.GetNextBlockHash, + AskForNewBlocks: helper.AskForNewBlocks, + + FetchHeadersAgain: helper.FetchHeadersAgain, + FetchBlockAgain: helper.FetchBlockAgain, + + RequestBlock: helper.RequestBlock, + RequestHeaders: helper.RequestHeaders, + } + + syncmgr := New(cfg) + syncmgr.syncmode = mode + + return syncmgr, helper +} diff --git a/pkg/syncmgr/normalmode.go b/pkg/syncmgr/normalmode.go new file mode 100644 index 000000000..218d6151b --- /dev/null +++ b/pkg/syncmgr/normalmode.go @@ -0,0 +1,59 @@ +package syncmgr + +import ( + "github.com/CityOfZion/neo-go/pkg/wire/payload" +) + +func (s *Syncmgr) normalModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase) error { + // If in normal mode, first process the headers + err := s.cfg.ProcessHeaders(hdrs) + if err != nil { + // If something went wrong with processing the headers + // Ask another peer for the headers. + //XXX: Increment banscore for this peer + return s.cfg.FetchHeadersAgain(hdrs[0].Hash) + } + + lenHeaders := len(hdrs) + firstHash := hdrs[0].Hash + lastHash := hdrs[lenHeaders-1].Hash + + // Update syncmgr latest header + s.headerHash = lastHash + + // If there are 2k headers, then ask for more headers and switch back to headers mode. + if lenHeaders == 2000 { + s.syncmode = headersMode + return s.cfg.RequestHeaders(lastHash) + } + + // Ask for the corresponding block iff there is < 2k headers + // then switch to blocksMode + // Bounds state that len > 1 && len!= 2000 & maxHeadersInMessage == 2000 + // This means that we have less than 2k headers + s.syncmode = blockMode + return s.cfg.RequestBlock(firstHash) +} + +// normalModeOnBlock is called when the sync manager is normal mode +// and receives a block. +func (s *Syncmgr) normalModeOnBlock(peer SyncPeer, block payload.Block) error { + // stop the timer that periodically asks for blocks + s.timer.Stop() + + // process block + err := s.cfg.ProcessBlock(block) + if err != nil { + s.timer.Reset(blockTimer) + return s.cfg.FetchBlockAgain(block.Hash) + } + + diff := peer.Height() - block.Index + if diff > trailingHeight { + s.syncmode = headersMode + return s.cfg.RequestHeaders(block.Hash) + } + + s.timer.Reset(blockTimer) + return nil +} diff --git a/pkg/syncmgr/syncmgr.go b/pkg/syncmgr/syncmgr.go new file mode 100644 index 000000000..bb06dfa7e --- /dev/null +++ b/pkg/syncmgr/syncmgr.go @@ -0,0 +1,135 @@ +package syncmgr + +import ( + "fmt" + "time" + + "github.com/CityOfZion/neo-go/pkg/wire/payload" + "github.com/CityOfZion/neo-go/pkg/wire/util" +) + +type mode uint8 + +// Note: this is the unoptimised version without parallel sync +// The algorithm for the unoptimsied version is simple: +// Download 2000 headers, then download the blocks for those headers +// Once those blocks are downloaded, we repeat the process again +// Until we are nomore than one block behind the tip. +// Once this happens, we switch into normal mode. +//In normal mode, we have a timer on for X seconds and ask nodes for blocks and also to doublecheck +// if we are behind once the timer runs out. +// The timer restarts whenever we receive a block. +// The parameter X should be approximately the time it takes the network to reach consensus + +//blockTimer approximates to how long it takes to reach consensus and propagate +// a block in the network. Once a node has synchronised with the network, he will +// ask the network for a newblock every blockTimer +const blockTimer = 20 * time.Second + +// trailingHeight indicates how many blocks the node has to be behind by +// before he switches to headersMode. +const trailingHeight = 100 + +// indicates how many blocks the node has to be behind by +// before he switches to normalMode and fetches blocks every X seconds. +const cruiseHeight = 0 + +const ( + headersMode mode = 1 + blockMode mode = 2 + normalMode mode = 3 +) + +//Syncmgr keeps the node in sync with the rest of the network +type Syncmgr struct { + syncmode mode + cfg *Config + timer *time.Timer + + // headerHash is the hash of the last header in the last OnHeaders message that we received. + // When receiving blocks, we can use this to determine whether the node has downloaded + // all of the blocks for the last headers messages + headerHash util.Uint256 +} + +// New creates a new sync manager +func New(cfg *Config) *Syncmgr { + + newBlockTimer := time.AfterFunc(blockTimer, func() { + cfg.AskForNewBlocks() + }) + newBlockTimer.Stop() + + return &Syncmgr{ + syncmode: headersMode, + cfg: cfg, + timer: newBlockTimer, + } +} + +// OnHeader is called when the node receives a headers message +func (s *Syncmgr) OnHeader(peer SyncPeer, msg *payload.HeadersMessage) error { + + // XXX(Optimisation): First check if we actually need these headers + // Check the last header in msg and then check what our latest header that was saved is + // If our latest header is above the lastHeader, then we do not save it + // We could also have that our latest header is above only some of the headers. + // In this case, we should remove the headers that we already have + + if len(msg.Headers) == 0 { + // XXX: Increment banScore for this peer, for sending empty headers message + return nil + } + + var err error + + switch s.syncmode { + case headersMode: + err = s.headersModeOnHeaders(peer, msg.Headers) + case blockMode: + err = s.blockModeOnHeaders(peer, msg.Headers) + case normalMode: + err = s.normalModeOnHeaders(peer, msg.Headers) + default: + err = s.headersModeOnHeaders(peer, msg.Headers) + } + + // XXX(Kev):The only meaningful error here would be if the peer + // we re-requested blocks from failed. In the next iteration, this will be handled + // by the peer manager, who will only return an error, if we are connected to no peers. + // Upon re-alising this, the node will then send out GetAddresses to the network and + // syncing will be resumed, once we find peers to connect to. + + hdr := msg.Headers[len(msg.Headers)-1] + fmt.Printf("Finished processing headers. LastHash in set was: %s\n ", hdr.Hash.ReverseString()) + + return err +} + +// OnBlock is called when the node receives a block +func (s *Syncmgr) OnBlock(peer SyncPeer, msg *payload.BlockMessage) error { + fmt.Printf("Block received with height %d\n", msg.Block.Index) + + var err error + + switch s.syncmode { + case headersMode: + err = s.headersModeOnBlock(peer, msg.Block) + case blockMode: + err = s.blockModeOnBlock(peer, msg.Block) + case normalMode: + err = s.normalModeOnBlock(peer, msg.Block) + default: + err = s.headersModeOnBlock(peer, msg.Block) + } + + fmt.Printf("Processed Block with height %d\n", msg.Block.Index) + + return err +} + +//IsCurrent returns true if the node is currently +// synced up with the network +func (s *Syncmgr) IsCurrent() bool { + return s.syncmode == normalMode +} diff --git a/pkg/syncmgr/syncmgr_onblock_test.go b/pkg/syncmgr/syncmgr_onblock_test.go new file mode 100644 index 000000000..d5b79f0b3 --- /dev/null +++ b/pkg/syncmgr/syncmgr_onblock_test.go @@ -0,0 +1,97 @@ +package syncmgr + +import ( + "testing" + + "github.com/CityOfZion/neo-go/pkg/chain" + + "github.com/CityOfZion/neo-go/pkg/wire/payload" + "github.com/stretchr/testify/assert" +) + +func TestHeadersModeOnBlock(t *testing.T) { + + syncmgr, helper := setupSyncMgr(headersMode) + + syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0)) + + // In headerMode, we do nothing + assert.Equal(t, 0, helper.blocksProcessed) +} + +func TestBlockModeOnBlock(t *testing.T) { + + syncmgr, helper := setupSyncMgr(blockMode) + + syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0)) + + // When a block is received in blockMode, it is processed + assert.Equal(t, 1, helper.blocksProcessed) +} +func TestNormalModeOnBlock(t *testing.T) { + + syncmgr, helper := setupSyncMgr(normalMode) + + syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0)) + + // When a block is received in normal, it is processed + assert.Equal(t, 1, helper.blocksProcessed) +} + +func TestBlockModeToNormalMode(t *testing.T) { + + syncmgr, _ := setupSyncMgr(blockMode) + + peer := &mockPeer{ + height: 100, + } + + blkMessage := randomBlockMessage(t, 100) + + syncmgr.OnBlock(peer, blkMessage) + + // We should switch to normal mode, since the block + //we received is close to the height of the peer. See cruiseHeight + assert.Equal(t, normalMode, syncmgr.syncmode) + +} +func TestBlockModeStayInBlockMode(t *testing.T) { + + syncmgr, _ := setupSyncMgr(blockMode) + + // We need our latest know hash to not be equal to the hash + // of the block we received, to stay in blockmode + syncmgr.headerHash = randomUint256(t) + + peer := &mockPeer{ + height: 2000, + } + + blkMessage := randomBlockMessage(t, 100) + + syncmgr.OnBlock(peer, blkMessage) + + // We should stay in block mode, since the block we received is + // still quite far behind the peers height + assert.Equal(t, blockMode, syncmgr.syncmode) +} +func TestBlockModeAlreadyExistsErr(t *testing.T) { + + syncmgr, helper := setupSyncMgr(blockMode) + helper.err = chain.ErrBlockAlreadyExists + + syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 100)) + + assert.Equal(t, 0, helper.blockFetchRequest) + + // If we have a block already exists in blockmode, then we + // switch back to headers mode. + assert.Equal(t, headersMode, syncmgr.syncmode) +} + +func randomBlockMessage(t *testing.T, height uint32) *payload.BlockMessage { + blockMessage, err := payload.NewBlockMessage() + blockMessage.BlockBase.Index = height + assert.Nil(t, err) + return blockMessage +} diff --git a/pkg/syncmgr/syncmgr_onheaders_test.go b/pkg/syncmgr/syncmgr_onheaders_test.go new file mode 100644 index 000000000..2f60a4b72 --- /dev/null +++ b/pkg/syncmgr/syncmgr_onheaders_test.go @@ -0,0 +1,117 @@ +package syncmgr + +import ( + "testing" + + "github.com/CityOfZion/neo-go/pkg/chain" + + "github.com/stretchr/testify/assert" + + "github.com/CityOfZion/neo-go/pkg/wire/util" +) + +func TestHeadersModeOnHeaders(t *testing.T) { + + syncmgr, helper := setupSyncMgr(headersMode) + + syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 0)) + + // Since there were no headers, we should have exited early and processed nothing + assert.Equal(t, 0, helper.headersProcessed) + + // ProcessHeaders should have been called once to process all 100 headers + syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 100)) + assert.Equal(t, 100, helper.headersProcessed) + + // Mode should now be blockMode + assert.Equal(t, blockMode, syncmgr.syncmode) + +} + +func TestBlockModeOnHeaders(t *testing.T) { + syncmgr, helper := setupSyncMgr(blockMode) + + // If we receive a header in blockmode, no headers will be processed + syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 100)) + assert.Equal(t, 0, helper.headersProcessed) +} +func TestNormalModeOnHeadersMaxHeaders(t *testing.T) { + syncmgr, helper := setupSyncMgr(normalMode) + + // If we receive a header in normalmode, headers will be processed + syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 2000)) + assert.Equal(t, 2000, helper.headersProcessed) + + // Mode should now be headersMode since we received 2000 headers + assert.Equal(t, headersMode, syncmgr.syncmode) +} + +// This differs from the previous function in that +//we did not receive the max amount of headers +func TestNormalModeOnHeaders(t *testing.T) { + syncmgr, helper := setupSyncMgr(normalMode) + + // If we receive a header in normalmode, headers will be processed + syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200)) + assert.Equal(t, 200, helper.headersProcessed) + + // Because we did not receive 2000 headers, we switch to blockMode + assert.Equal(t, blockMode, syncmgr.syncmode) +} + +func TestLastHeaderUpdates(t *testing.T) { + syncmgr, _ := setupSyncMgr(headersMode) + + hdrsMessage := randomHeadersMessage(t, 200) + hdrs := hdrsMessage.Headers + lastHeader := hdrs[len(hdrs)-1] + + syncmgr.OnHeader(&mockPeer{}, hdrsMessage) + + // Headers are processed in headersMode + // Last header should be updated + assert.True(t, syncmgr.headerHash.Equals(lastHeader.Hash)) + + // Change mode to blockMode and reset lastHeader + syncmgr.syncmode = blockMode + syncmgr.headerHash = util.Uint256{} + + syncmgr.OnHeader(&mockPeer{}, hdrsMessage) + + // header should not be changed + assert.False(t, syncmgr.headerHash.Equals(lastHeader.Hash)) + + // Change mode to normalMode and reset lastHeader + syncmgr.syncmode = normalMode + syncmgr.headerHash = util.Uint256{} + + syncmgr.OnHeader(&mockPeer{}, hdrsMessage) + + // headers are processed in normalMode + // hash should be updated + assert.True(t, syncmgr.headerHash.Equals(lastHeader.Hash)) + +} + +func TestHeadersModeOnHeadersErr(t *testing.T) { + + syncmgr, helper := setupSyncMgr(headersMode) + helper.err = &chain.ValidationError{} + + syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200)) + + // On a validation error, we should request for another peer + // to send us these headers + assert.Equal(t, 1, helper.headersFetchRequest) +} + +func TestNormalModeOnHeadersErr(t *testing.T) { + syncmgr, helper := setupSyncMgr(normalMode) + helper.err = &chain.ValidationError{} + + syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200)) + + // On a validation error, we should request for another peer + // to send us these headers + assert.Equal(t, 1, helper.headersFetchRequest) +}