neoneo-go/pkg/syncmgr/syncmgr.go
BlockChainDev c401247af9 [syncmgr]
- refactor syncmgr for blockpool
- add nextBlockIndex so that we can add blocks to the blockPool by
comparing their index
- add processBlock helper method,wraps around cfg.ProcessBlock and increments the nextBlockIndex
internally
2019-03-30 21:34:27 +00:00

152 lines
4.5 KiB
Go

package syncmgr
import (
"fmt"
"sync"
"time"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
"github.com/CityOfZion/neo-go/pkg/wire/util"
)
type mode uint8
// Note: this is the unoptimised version without parallel sync
// The algorithm for the unoptimsied version is simple:
// Download 2000 headers, then download the blocks for those headers
// Once those blocks are downloaded, we repeat the process again
// Until we are nomore than one block behind the tip.
// Once this happens, we switch into normal mode.
//In normal mode, we have a timer on for X seconds and ask nodes for blocks and also to doublecheck
// if we are behind once the timer runs out.
// The timer restarts whenever we receive a block.
// The parameter X should be approximately the time it takes the network to reach consensus
//blockTimer approximates to how long it takes to reach consensus and propagate
// a block in the network. Once a node has synchronised with the network, he will
// ask the network for a newblock every blockTimer
const blockTimer = 20 * time.Second
// trailingHeight indicates how many blocks the node has to be behind by
// before he switches to headersMode.
const trailingHeight = 100
// indicates how many blocks the node has to be behind by
// before he switches to normalMode and fetches blocks every X seconds.
const cruiseHeight = 0
const (
headersMode mode = 1
blockMode mode = 2
normalMode mode = 3
)
//Syncmgr keeps the node in sync with the rest of the network
type Syncmgr struct {
syncmode mode
cfg *Config
timer *time.Timer
// headerHash is the hash of the last header in the last OnHeaders message that we received.
// When receiving blocks, we can use this to determine whether the node has downloaded
// all of the blocks for the last headers messages
headerHash util.Uint256
poolLock sync.Mutex
blockPool []payload.Block
nextBlockIndex uint32
}
// New creates a new sync manager
func New(cfg *Config, nextBlockIndex uint32) *Syncmgr {
newBlockTimer := time.AfterFunc(blockTimer, func() {
cfg.AskForNewBlocks()
})
newBlockTimer.Stop()
return &Syncmgr{
syncmode: headersMode,
cfg: cfg,
timer: newBlockTimer,
nextBlockIndex: nextBlockIndex,
}
}
// OnHeader is called when the node receives a headers message
func (s *Syncmgr) OnHeader(peer SyncPeer, msg *payload.HeadersMessage) error {
// XXX(Optimisation): First check if we actually need these headers
// Check the last header in msg and then check what our latest header that was saved is
// If our latest header is above the lastHeader, then we do not save it
// We could also have that our latest header is above only some of the headers.
// In this case, we should remove the headers that we already have
if len(msg.Headers) == 0 {
// XXX: Increment banScore for this peer, for sending empty headers message
return nil
}
var err error
switch s.syncmode {
case headersMode:
err = s.headersModeOnHeaders(peer, msg.Headers)
case blockMode:
err = s.blockModeOnHeaders(peer, msg.Headers)
case normalMode:
err = s.normalModeOnHeaders(peer, msg.Headers)
default:
err = s.headersModeOnHeaders(peer, msg.Headers)
}
// XXX(Kev):The only meaningful error here would be if the peer
// we re-requested blocks from failed. In the next iteration, this will be handled
// by the peer manager, who will only return an error, if we are connected to no peers.
// Upon re-alising this, the node will then send out GetAddresses to the network and
// syncing will be resumed, once we find peers to connect to.
hdr := msg.Headers[len(msg.Headers)-1]
fmt.Printf("Finished processing headers. LastHash in set was: %s\n ", hdr.Hash.ReverseString())
return err
}
// OnBlock is called when the node receives a block
func (s *Syncmgr) OnBlock(peer SyncPeer, msg *payload.BlockMessage) error {
fmt.Printf("Block received with height %d\n", msg.Block.Index)
var err error
switch s.syncmode {
case headersMode:
err = s.headersModeOnBlock(peer, msg.Block)
case blockMode:
err = s.blockModeOnBlock(peer, msg.Block)
case normalMode:
err = s.normalModeOnBlock(peer, msg.Block)
default:
err = s.headersModeOnBlock(peer, msg.Block)
}
fmt.Printf("Processed Block with height %d\n", msg.Block.Index)
return err
}
//IsCurrent returns true if the node is currently
// synced up with the network
func (s *Syncmgr) IsCurrent() bool {
return s.syncmode == normalMode
}
func (s *Syncmgr) processBlock(block payload.Block) error {
err := s.cfg.ProcessBlock(block)
if err != nil {
return err
}
s.nextBlockIndex++
return nil
}