network: plug in dBFT library

This commit is contained in:
Evgenii Stratonikov 2019-11-15 13:32:40 +03:00
parent 5ad665bc37
commit fdd5276d3e
29 changed files with 1415 additions and 142 deletions

View file

@ -16,6 +16,7 @@ import (
"github.com/CityOfZion/neo-go/pkg/network/payload"
"github.com/CityOfZion/neo-go/pkg/util"
log "github.com/sirupsen/logrus"
"go.uber.org/atomic"
)
const (
@ -61,6 +62,8 @@ type (
register chan Peer
unregister chan peerDrop
quit chan struct{}
connected *atomic.Bool
}
peerDrop struct {
@ -87,9 +90,21 @@ func NewServer(config ServerConfig, chain core.Blockchainer) *Server {
register: make(chan Peer),
unregister: make(chan peerDrop),
peers: make(map[Peer]bool),
consensus: consensus.NewService(),
connected: atomic.NewBool(false),
}
srv, err := consensus.NewService(consensus.Config{
Broadcast: s.handleNewPayload,
Chain: chain,
RequestTx: s.requestTx,
Wallet: config.Wallet,
})
if err != nil {
return nil
}
s.consensus = srv
if s.MinPeers <= 0 {
log.WithFields(log.Fields{
"MinPeers configured": s.MinPeers,
@ -233,10 +248,31 @@ func (s *Server) run() {
}
}
func (s *Server) tryStartConsensus() {
if s.Wallet == nil || s.connected.Load() {
return
}
if s.PeerCount() >= s.MinPeers {
log.Info("minimum amount of peers were connected to")
if s.connected.CAS(false, true) {
s.consensus.Start()
}
}
}
// Peers returns the current list of peers connected to
// the server.
func (s *Server) Peers() map[Peer]bool {
return s.peers
s.lock.RLock()
defer s.lock.RUnlock()
peers := make(map[Peer]bool, len(s.peers))
for k, v := range s.peers {
peers[k] = v
}
return peers
}
// PeerCount returns the number of current connected peers.
@ -394,6 +430,13 @@ func (s *Server) handleConsensusCmd(cp *consensus.Payload) error {
return nil
}
// handleTxCmd processes received transaction.
// It never returns an error.
func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
s.consensus.OnTransaction(tx)
return nil
}
// handleAddrCmd will process received addresses.
func (s *Server) handleAddrCmd(p Peer, addrs *payload.AddressList) error {
for _, a := range addrs.Addrs {
@ -485,6 +528,9 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
case CMDConsensus:
cp := msg.Payload.(*consensus.Payload)
return s.handleConsensusCmd(cp)
case CMDTX:
tx := msg.Payload.(*transaction.Transaction)
return s.handleTxCmd(tx)
case CMDVersion, CMDVerack:
return fmt.Errorf("received '%s' after the handshake", msg.CommandType())
}
@ -499,6 +545,8 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
return err
}
go s.startProtocol(peer)
s.tryStartConsensus()
default:
return fmt.Errorf("received '%s' during handshake", msg.CommandType())
}
@ -506,6 +554,28 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
return nil
}
func (s *Server) handleNewPayload(p *consensus.Payload) {
s.relayInventory(payload.ConsensusType, p.Hash())
}
func (s *Server) requestTx(hashes ...util.Uint256) {
if len(hashes) == 0 {
return
}
s.relayInventory(payload.TXType, hashes...)
}
func (s *Server) relayInventory(t payload.InventoryType, hashes ...util.Uint256) {
for peer := range s.Peers() {
if !peer.Handshaked() {
continue
}
payload := payload.NewInventory(t, hashes)
s.RelayDirectly(peer, payload)
}
}
// RelayTxn a new transaction to the local node and the connected peers.
// Reference: the method OnRelay in C#: https://github.com/neo-project/neo/blob/master/neo/Network/P2P/LocalNode.cs#L159
func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason {