network: introduce (*Server).IsInSync, start consensus in synced state

We define synchronized state as a combination of minimum number of peers and
chain height being not behind of more than 2/3 of these peers.
This commit is contained in:
Roman Khimov 2020-02-14 20:46:05 +03:00
parent a2616cfafe
commit c5d54e9992

View file

@ -64,7 +64,7 @@ type (
unregister chan peerDrop unregister chan peerDrop
quit chan struct{} quit chan struct{}
connected *atomic.Bool consensusStarted *atomic.Bool
log *zap.Logger log *zap.Logger
} }
@ -88,17 +88,20 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (*
} }
s := &Server{ s := &Server{
ServerConfig: config, ServerConfig: config,
chain: chain, chain: chain,
id: randomID(), id: randomID(),
quit: make(chan struct{}), quit: make(chan struct{}),
register: make(chan Peer), register: make(chan Peer),
unregister: make(chan peerDrop), unregister: make(chan peerDrop),
peers: make(map[Peer]bool), peers: make(map[Peer]bool),
connected: atomic.NewBool(false), consensusStarted: atomic.NewBool(false),
log: log, log: log,
} }
s.bQueue = newBlockQueue(maxBlockBatch, chain, log, s.relayBlock) s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) {
s.tryStartConsensus()
s.relayBlock(b)
})
srv, err := consensus.NewService(consensus.Config{ srv, err := consensus.NewService(consensus.Config{
Logger: log, Logger: log,
@ -274,13 +277,13 @@ func (s *Server) runProto() {
} }
func (s *Server) tryStartConsensus() { func (s *Server) tryStartConsensus() {
if s.Wallet == nil || s.connected.Load() { if s.Wallet == nil || s.consensusStarted.Load() {
return return
} }
if s.HandshakedPeersCount() >= s.MinPeers { if s.IsInSync() {
s.log.Info("minimum amount of peers were connected to") s.log.Info("node reached synchronized state, starting consensus")
if s.connected.CAS(false, true) { if s.consensusStarted.CAS(false, true) {
s.consensus.Start() s.consensus.Start()
} }
} }
@ -336,6 +339,39 @@ func (s *Server) getVersionMsg() *Message {
return s.MkMsg(CMDVersion, payload) return s.MkMsg(CMDVersion, payload)
} }
// IsInSync answers the question of whether the server is in sync with the
// network or not (at least how the server itself sees it). The server operates
// with the data that it has, the number of peers (that has to be more than
// minimum number) and height of these peers (our chain has to be not lower
// than 2/3 of our peers have). Ideally we would check for the highest of the
// peers, but the problem is that they can lie to us and send whatever height
// they want to.
func (s *Server) IsInSync() bool {
var peersNumber int
var notHigher int
if s.MinPeers == 0 {
return true
}
ourLastBlock := s.chain.BlockHeight()
s.lock.RLock()
for p := range s.peers {
if p.Handshaked() {
peersNumber++
if ourLastBlock >= p.LastBlockIndex() {
notHigher++
}
}
}
s.lock.RUnlock()
// Checking bQueue would also be nice, but it can be filled with garbage
// easily at the moment.
return peersNumber >= s.MinPeers && (3*notHigher > 2*peersNumber) // && s.bQueue.length() == 0
}
// When a peer sends out his version we reply with verack after validating // When a peer sends out his version we reply with verack after validating
// the version. // the version.
func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {