Syncmgr: Implement synchronisation manager (#249)

* [syncmgr]

- Add blockmode, normal mode, headermode
- Add config file
- Add test files
- removed RequestBlocks and RequestHeaders from peers, as we will use
the peermanager for this
- OnHeaders and OnBlock in syncmgr, now return errors
- refactored all tests to use a convenience method to return a syncmgr
and testHelper
This commit is contained in:
decentralisedkev 2019-03-28 21:22:17 +00:00 committed by GitHub
parent 493d8f3d95
commit cb21c66316
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 664 additions and 0 deletions

55
pkg/syncmgr/blockmode.go Normal file
View file

@ -0,0 +1,55 @@
package syncmgr
import (
"github.com/CityOfZion/neo-go/pkg/chain"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
)
// blockModeOnBlock is called when the sync manager is block mode
// and receives a block.
func (s *Syncmgr) blockModeOnBlock(peer SyncPeer, block payload.Block) error {
// Process Block
err := s.cfg.ProcessBlock(block)
if err == chain.ErrFutureBlock {
// XXX(Optimisation): We can cache future blocks in blockmode, if we have the corresponding header
// We can have the server cache them and sort out the semantics for when to send them to the chain
// Server can listen on chain for when a new block is saved
// or we could embed a struct in this syncmgr called blockCache, syncmgr can just tell it when it has processed
//a block and we can call ProcessBlock
return err
}
if err != nil && err != chain.ErrBlockAlreadyExists {
return s.cfg.FetchBlockAgain(block.Hash)
}
// Check if blockhashReceived == the header hash from last get headers this node performed
// if not then increment and request next block
if s.headerHash != block.Hash {
nextHash, err := s.cfg.GetNextBlockHash()
if err != nil {
return err
}
err = s.cfg.RequestBlock(nextHash)
return err
}
// If we are caught up then go into normal mode
diff := peer.Height() - block.Index
if diff <= cruiseHeight {
s.syncmode = normalMode
s.timer.Reset(blockTimer)
return nil
}
// If not then we go back into headersMode and request more headers.
s.syncmode = headersMode
return s.cfg.RequestHeaders(block.Hash)
}
func (s *Syncmgr) blockModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase) error {
// We ignore headers when in this mode
return nil
}

47
pkg/syncmgr/config.go Normal file
View file

@ -0,0 +1,47 @@
package syncmgr
import (
"github.com/CityOfZion/neo-go/pkg/wire/payload"
"github.com/CityOfZion/neo-go/pkg/wire/util"
)
// Config is the configuration file for the sync manager
type Config struct {
// Chain functions
ProcessBlock func(msg payload.Block) error
ProcessHeaders func(hdrs []*payload.BlockBase) error
// RequestHeaders will send a getHeaders request
// with the hash passed in as a parameter
RequestHeaders func(hash util.Uint256) error
//RequestBlock will send a getdata request for the block
// with the hash passed as a parameter
RequestBlock func(hash util.Uint256) error
// GetNextBlockHash returns the block hash of the header infront of thr block
// at the tip of this nodes chain. This assumes that the node is not in sync
GetNextBlockHash func() (util.Uint256, error)
// GetBestBlockHash gets the block hash of the last saved block.
GetBestBlockHash func() (util.Uint256, error)
// AskForNewBlocks will send out a message to the network
// asking for new blocks
AskForNewBlocks func()
// FetchHeadersAgain is called when a peer has provided headers that have not
// validated properly. We pass in the hash of the first header
FetchHeadersAgain func(util.Uint256) error
// FetchHeadersAgain is called when a peer has provided a block that has not
// validated properly. We pass in the hash of the block
FetchBlockAgain func(util.Uint256) error
}
// SyncPeer represents a peer on the network
// that this node can sync with
type SyncPeer interface {
Height() uint32
}

41
pkg/syncmgr/headermode.go Normal file
View file

@ -0,0 +1,41 @@
package syncmgr
import (
"github.com/CityOfZion/neo-go/pkg/chain"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
)
// headersModeOnHeaders is called when the sync manager is headers mode
// and receives a header.
func (s *Syncmgr) headersModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase) error {
// If we are in Headers mode, then we just need to process the headers
// Note: For the un-optimised version, we move straight to blocksOnly mode
firstHash := hdrs[0].Hash
err := s.cfg.ProcessHeaders(hdrs)
if err == nil {
// Update syncmgr last header
s.headerHash = hdrs[len(hdrs)-1].Hash
s.syncmode = blockMode
return s.cfg.RequestBlock(firstHash)
}
// Check whether it is a validation error, or a database error
if _, ok := err.(*chain.ValidationError); ok {
// If we get a validation error we re-request the headers
// the method will automatically fetch from a different peer
// XXX: Add increment banScore for this peer
return s.cfg.FetchHeadersAgain(firstHash)
}
// This means it is a database error. We have no way to recover from this.
panic(err.Error())
}
// headersModeOnBlock is called when the sync manager is headers mode
// and receives a block.
func (s *Syncmgr) headersModeOnBlock(peer SyncPeer, block payload.Block) error {
// While in headers mode, ignore any blocks received
return nil
}

View file

@ -0,0 +1,113 @@
package syncmgr
import (
"crypto/rand"
"testing"
"github.com/stretchr/testify/assert"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
"github.com/CityOfZion/neo-go/pkg/wire/util"
)
type syncTestHelper struct {
blocksProcessed int
headersProcessed int
newBlockRequest int
headersFetchRequest int
blockFetchRequest int
err error
}
func (s *syncTestHelper) ProcessBlock(msg payload.Block) error {
s.blocksProcessed++
return s.err
}
func (s *syncTestHelper) ProcessHeaders(hdrs []*payload.BlockBase) error {
s.headersProcessed = s.headersProcessed + len(hdrs)
return s.err
}
func (s *syncTestHelper) GetNextBlockHash() (util.Uint256, error) {
return util.Uint256{}, s.err
}
func (s *syncTestHelper) AskForNewBlocks() {
s.newBlockRequest++
}
func (s *syncTestHelper) FetchHeadersAgain(util.Uint256) error {
s.headersFetchRequest++
return s.err
}
func (s *syncTestHelper) FetchBlockAgain(util.Uint256) error {
s.blockFetchRequest++
return s.err
}
func (s *syncTestHelper) RequestBlock(util.Uint256) error {
s.blockFetchRequest++
return s.err
}
func (s *syncTestHelper) RequestHeaders(util.Uint256) error {
s.headersFetchRequest++
return s.err
}
type mockPeer struct {
height uint32
}
func (p *mockPeer) Height() uint32 { return p.height }
func randomHeadersMessage(t *testing.T, num int) *payload.HeadersMessage {
var hdrs []*payload.BlockBase
for i := 0; i < num; i++ {
hash := randomUint256(t)
hdr := &payload.BlockBase{Hash: hash}
hdrs = append(hdrs, hdr)
}
hdrsMsg, err := payload.NewHeadersMessage()
assert.Nil(t, err)
hdrsMsg.Headers = hdrs
return hdrsMsg
}
func randomUint256(t *testing.T) util.Uint256 {
hash := make([]byte, 32)
_, err := rand.Read(hash)
assert.Nil(t, err)
u, err := util.Uint256DecodeBytes(hash)
assert.Nil(t, err)
return u
}
func setupSyncMgr(mode mode) (*Syncmgr, *syncTestHelper) {
helper := &syncTestHelper{}
cfg := &Config{
ProcessBlock: helper.ProcessBlock,
ProcessHeaders: helper.ProcessHeaders,
GetNextBlockHash: helper.GetNextBlockHash,
AskForNewBlocks: helper.AskForNewBlocks,
FetchHeadersAgain: helper.FetchHeadersAgain,
FetchBlockAgain: helper.FetchBlockAgain,
RequestBlock: helper.RequestBlock,
RequestHeaders: helper.RequestHeaders,
}
syncmgr := New(cfg)
syncmgr.syncmode = mode
return syncmgr, helper
}

59
pkg/syncmgr/normalmode.go Normal file
View file

@ -0,0 +1,59 @@
package syncmgr
import (
"github.com/CityOfZion/neo-go/pkg/wire/payload"
)
func (s *Syncmgr) normalModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase) error {
// If in normal mode, first process the headers
err := s.cfg.ProcessHeaders(hdrs)
if err != nil {
// If something went wrong with processing the headers
// Ask another peer for the headers.
//XXX: Increment banscore for this peer
return s.cfg.FetchHeadersAgain(hdrs[0].Hash)
}
lenHeaders := len(hdrs)
firstHash := hdrs[0].Hash
lastHash := hdrs[lenHeaders-1].Hash
// Update syncmgr latest header
s.headerHash = lastHash
// If there are 2k headers, then ask for more headers and switch back to headers mode.
if lenHeaders == 2000 {
s.syncmode = headersMode
return s.cfg.RequestHeaders(lastHash)
}
// Ask for the corresponding block iff there is < 2k headers
// then switch to blocksMode
// Bounds state that len > 1 && len!= 2000 & maxHeadersInMessage == 2000
// This means that we have less than 2k headers
s.syncmode = blockMode
return s.cfg.RequestBlock(firstHash)
}
// normalModeOnBlock is called when the sync manager is normal mode
// and receives a block.
func (s *Syncmgr) normalModeOnBlock(peer SyncPeer, block payload.Block) error {
// stop the timer that periodically asks for blocks
s.timer.Stop()
// process block
err := s.cfg.ProcessBlock(block)
if err != nil {
s.timer.Reset(blockTimer)
return s.cfg.FetchBlockAgain(block.Hash)
}
diff := peer.Height() - block.Index
if diff > trailingHeight {
s.syncmode = headersMode
return s.cfg.RequestHeaders(block.Hash)
}
s.timer.Reset(blockTimer)
return nil
}

135
pkg/syncmgr/syncmgr.go Normal file
View file

@ -0,0 +1,135 @@
package syncmgr
import (
"fmt"
"time"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
"github.com/CityOfZion/neo-go/pkg/wire/util"
)
type mode uint8
// Note: this is the unoptimised version without parallel sync
// The algorithm for the unoptimsied version is simple:
// Download 2000 headers, then download the blocks for those headers
// Once those blocks are downloaded, we repeat the process again
// Until we are nomore than one block behind the tip.
// Once this happens, we switch into normal mode.
//In normal mode, we have a timer on for X seconds and ask nodes for blocks and also to doublecheck
// if we are behind once the timer runs out.
// The timer restarts whenever we receive a block.
// The parameter X should be approximately the time it takes the network to reach consensus
//blockTimer approximates to how long it takes to reach consensus and propagate
// a block in the network. Once a node has synchronised with the network, he will
// ask the network for a newblock every blockTimer
const blockTimer = 20 * time.Second
// trailingHeight indicates how many blocks the node has to be behind by
// before he switches to headersMode.
const trailingHeight = 100
// indicates how many blocks the node has to be behind by
// before he switches to normalMode and fetches blocks every X seconds.
const cruiseHeight = 0
const (
headersMode mode = 1
blockMode mode = 2
normalMode mode = 3
)
//Syncmgr keeps the node in sync with the rest of the network
type Syncmgr struct {
syncmode mode
cfg *Config
timer *time.Timer
// headerHash is the hash of the last header in the last OnHeaders message that we received.
// When receiving blocks, we can use this to determine whether the node has downloaded
// all of the blocks for the last headers messages
headerHash util.Uint256
}
// New creates a new sync manager
func New(cfg *Config) *Syncmgr {
newBlockTimer := time.AfterFunc(blockTimer, func() {
cfg.AskForNewBlocks()
})
newBlockTimer.Stop()
return &Syncmgr{
syncmode: headersMode,
cfg: cfg,
timer: newBlockTimer,
}
}
// OnHeader is called when the node receives a headers message
func (s *Syncmgr) OnHeader(peer SyncPeer, msg *payload.HeadersMessage) error {
// XXX(Optimisation): First check if we actually need these headers
// Check the last header in msg and then check what our latest header that was saved is
// If our latest header is above the lastHeader, then we do not save it
// We could also have that our latest header is above only some of the headers.
// In this case, we should remove the headers that we already have
if len(msg.Headers) == 0 {
// XXX: Increment banScore for this peer, for sending empty headers message
return nil
}
var err error
switch s.syncmode {
case headersMode:
err = s.headersModeOnHeaders(peer, msg.Headers)
case blockMode:
err = s.blockModeOnHeaders(peer, msg.Headers)
case normalMode:
err = s.normalModeOnHeaders(peer, msg.Headers)
default:
err = s.headersModeOnHeaders(peer, msg.Headers)
}
// XXX(Kev):The only meaningful error here would be if the peer
// we re-requested blocks from failed. In the next iteration, this will be handled
// by the peer manager, who will only return an error, if we are connected to no peers.
// Upon re-alising this, the node will then send out GetAddresses to the network and
// syncing will be resumed, once we find peers to connect to.
hdr := msg.Headers[len(msg.Headers)-1]
fmt.Printf("Finished processing headers. LastHash in set was: %s\n ", hdr.Hash.ReverseString())
return err
}
// OnBlock is called when the node receives a block
func (s *Syncmgr) OnBlock(peer SyncPeer, msg *payload.BlockMessage) error {
fmt.Printf("Block received with height %d\n", msg.Block.Index)
var err error
switch s.syncmode {
case headersMode:
err = s.headersModeOnBlock(peer, msg.Block)
case blockMode:
err = s.blockModeOnBlock(peer, msg.Block)
case normalMode:
err = s.normalModeOnBlock(peer, msg.Block)
default:
err = s.headersModeOnBlock(peer, msg.Block)
}
fmt.Printf("Processed Block with height %d\n", msg.Block.Index)
return err
}
//IsCurrent returns true if the node is currently
// synced up with the network
func (s *Syncmgr) IsCurrent() bool {
return s.syncmode == normalMode
}

View file

@ -0,0 +1,97 @@
package syncmgr
import (
"testing"
"github.com/CityOfZion/neo-go/pkg/chain"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
"github.com/stretchr/testify/assert"
)
func TestHeadersModeOnBlock(t *testing.T) {
syncmgr, helper := setupSyncMgr(headersMode)
syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0))
// In headerMode, we do nothing
assert.Equal(t, 0, helper.blocksProcessed)
}
func TestBlockModeOnBlock(t *testing.T) {
syncmgr, helper := setupSyncMgr(blockMode)
syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0))
// When a block is received in blockMode, it is processed
assert.Equal(t, 1, helper.blocksProcessed)
}
func TestNormalModeOnBlock(t *testing.T) {
syncmgr, helper := setupSyncMgr(normalMode)
syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0))
// When a block is received in normal, it is processed
assert.Equal(t, 1, helper.blocksProcessed)
}
func TestBlockModeToNormalMode(t *testing.T) {
syncmgr, _ := setupSyncMgr(blockMode)
peer := &mockPeer{
height: 100,
}
blkMessage := randomBlockMessage(t, 100)
syncmgr.OnBlock(peer, blkMessage)
// We should switch to normal mode, since the block
//we received is close to the height of the peer. See cruiseHeight
assert.Equal(t, normalMode, syncmgr.syncmode)
}
func TestBlockModeStayInBlockMode(t *testing.T) {
syncmgr, _ := setupSyncMgr(blockMode)
// We need our latest know hash to not be equal to the hash
// of the block we received, to stay in blockmode
syncmgr.headerHash = randomUint256(t)
peer := &mockPeer{
height: 2000,
}
blkMessage := randomBlockMessage(t, 100)
syncmgr.OnBlock(peer, blkMessage)
// We should stay in block mode, since the block we received is
// still quite far behind the peers height
assert.Equal(t, blockMode, syncmgr.syncmode)
}
func TestBlockModeAlreadyExistsErr(t *testing.T) {
syncmgr, helper := setupSyncMgr(blockMode)
helper.err = chain.ErrBlockAlreadyExists
syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 100))
assert.Equal(t, 0, helper.blockFetchRequest)
// If we have a block already exists in blockmode, then we
// switch back to headers mode.
assert.Equal(t, headersMode, syncmgr.syncmode)
}
func randomBlockMessage(t *testing.T, height uint32) *payload.BlockMessage {
blockMessage, err := payload.NewBlockMessage()
blockMessage.BlockBase.Index = height
assert.Nil(t, err)
return blockMessage
}

View file

@ -0,0 +1,117 @@
package syncmgr
import (
"testing"
"github.com/CityOfZion/neo-go/pkg/chain"
"github.com/stretchr/testify/assert"
"github.com/CityOfZion/neo-go/pkg/wire/util"
)
func TestHeadersModeOnHeaders(t *testing.T) {
syncmgr, helper := setupSyncMgr(headersMode)
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 0))
// Since there were no headers, we should have exited early and processed nothing
assert.Equal(t, 0, helper.headersProcessed)
// ProcessHeaders should have been called once to process all 100 headers
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 100))
assert.Equal(t, 100, helper.headersProcessed)
// Mode should now be blockMode
assert.Equal(t, blockMode, syncmgr.syncmode)
}
func TestBlockModeOnHeaders(t *testing.T) {
syncmgr, helper := setupSyncMgr(blockMode)
// If we receive a header in blockmode, no headers will be processed
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 100))
assert.Equal(t, 0, helper.headersProcessed)
}
func TestNormalModeOnHeadersMaxHeaders(t *testing.T) {
syncmgr, helper := setupSyncMgr(normalMode)
// If we receive a header in normalmode, headers will be processed
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 2000))
assert.Equal(t, 2000, helper.headersProcessed)
// Mode should now be headersMode since we received 2000 headers
assert.Equal(t, headersMode, syncmgr.syncmode)
}
// This differs from the previous function in that
//we did not receive the max amount of headers
func TestNormalModeOnHeaders(t *testing.T) {
syncmgr, helper := setupSyncMgr(normalMode)
// If we receive a header in normalmode, headers will be processed
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200))
assert.Equal(t, 200, helper.headersProcessed)
// Because we did not receive 2000 headers, we switch to blockMode
assert.Equal(t, blockMode, syncmgr.syncmode)
}
func TestLastHeaderUpdates(t *testing.T) {
syncmgr, _ := setupSyncMgr(headersMode)
hdrsMessage := randomHeadersMessage(t, 200)
hdrs := hdrsMessage.Headers
lastHeader := hdrs[len(hdrs)-1]
syncmgr.OnHeader(&mockPeer{}, hdrsMessage)
// Headers are processed in headersMode
// Last header should be updated
assert.True(t, syncmgr.headerHash.Equals(lastHeader.Hash))
// Change mode to blockMode and reset lastHeader
syncmgr.syncmode = blockMode
syncmgr.headerHash = util.Uint256{}
syncmgr.OnHeader(&mockPeer{}, hdrsMessage)
// header should not be changed
assert.False(t, syncmgr.headerHash.Equals(lastHeader.Hash))
// Change mode to normalMode and reset lastHeader
syncmgr.syncmode = normalMode
syncmgr.headerHash = util.Uint256{}
syncmgr.OnHeader(&mockPeer{}, hdrsMessage)
// headers are processed in normalMode
// hash should be updated
assert.True(t, syncmgr.headerHash.Equals(lastHeader.Hash))
}
func TestHeadersModeOnHeadersErr(t *testing.T) {
syncmgr, helper := setupSyncMgr(headersMode)
helper.err = &chain.ValidationError{}
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200))
// On a validation error, we should request for another peer
// to send us these headers
assert.Equal(t, 1, helper.headersFetchRequest)
}
func TestNormalModeOnHeadersErr(t *testing.T) {
syncmgr, helper := setupSyncMgr(normalMode)
helper.err = &chain.ValidationError{}
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200))
// On a validation error, we should request for another peer
// to send us these headers
assert.Equal(t, 1, helper.headersFetchRequest)
}