diff --git a/pkg/network/server.go b/pkg/network/server.go index 5a45ed393..f238df365 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -64,7 +64,7 @@ type ( unregister chan peerDrop quit chan struct{} - connected *atomic.Bool + consensusStarted *atomic.Bool log *zap.Logger } @@ -88,17 +88,20 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (* } s := &Server{ - ServerConfig: config, - chain: chain, - id: randomID(), - quit: make(chan struct{}), - register: make(chan Peer), - unregister: make(chan peerDrop), - peers: make(map[Peer]bool), - connected: atomic.NewBool(false), - log: log, + ServerConfig: config, + chain: chain, + id: randomID(), + quit: make(chan struct{}), + register: make(chan Peer), + unregister: make(chan peerDrop), + peers: make(map[Peer]bool), + consensusStarted: atomic.NewBool(false), + log: log, } - s.bQueue = newBlockQueue(maxBlockBatch, chain, log, s.relayBlock) + s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) { + s.tryStartConsensus() + s.relayBlock(b) + }) srv, err := consensus.NewService(consensus.Config{ Logger: log, @@ -274,13 +277,13 @@ func (s *Server) runProto() { } func (s *Server) tryStartConsensus() { - if s.Wallet == nil || s.connected.Load() { + if s.Wallet == nil || s.consensusStarted.Load() { return } - if s.HandshakedPeersCount() >= s.MinPeers { - s.log.Info("minimum amount of peers were connected to") - if s.connected.CAS(false, true) { + if s.IsInSync() { + s.log.Info("node reached synchronized state, starting consensus") + if s.consensusStarted.CAS(false, true) { s.consensus.Start() } } @@ -336,6 +339,39 @@ func (s *Server) getVersionMsg() *Message { return s.MkMsg(CMDVersion, payload) } +// IsInSync answers the question of whether the server is in sync with the +// network or not (at least how the server itself sees it). The server operates +// with the data that it has, the number of peers (that has to be more than +// minimum number) and height of these peers (our chain has to be not lower +// than 2/3 of our peers have). Ideally we would check for the highest of the +// peers, but the problem is that they can lie to us and send whatever height +// they want to. +func (s *Server) IsInSync() bool { + var peersNumber int + var notHigher int + + if s.MinPeers == 0 { + return true + } + + ourLastBlock := s.chain.BlockHeight() + + s.lock.RLock() + for p := range s.peers { + if p.Handshaked() { + peersNumber++ + if ourLastBlock >= p.LastBlockIndex() { + notHigher++ + } + } + } + s.lock.RUnlock() + + // Checking bQueue would also be nice, but it can be filled with garbage + // easily at the moment. + return peersNumber >= s.MinPeers && (3*notHigher > 2*peersNumber) // && s.bQueue.length() == 0 +} + // When a peer sends out his version we reply with verack after validating // the version. func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {