network: introduce Server's MkMsg()

That wraps NewMessage() for a configured network.
This commit is contained in:
Roman Khimov 2020-01-21 17:31:51 +03:00
parent 1f672e0da7
commit 34b863d645
2 changed files with 21 additions and 15 deletions

View file

@ -152,6 +152,12 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) *S
return s return s
} }
// MkMsg creates a new message based on the server configured network and given
// parameters.
func (s *Server) MkMsg(cmd CommandType, p payload.Payload) *Message {
return NewMessage(s.Net, cmd, p)
}
// ID returns the servers ID. // ID returns the servers ID.
func (s *Server) ID() uint32 { func (s *Server) ID() uint32 {
return s.id return s.id
@ -198,7 +204,7 @@ func (s *Server) run() {
} }
if s.discovery.PoolCount() < minPoolCount { if s.discovery.PoolCount() < minPoolCount {
select { select {
case s.addrReq <- NewMessage(s.Net, CMDGetAddr, payload.NewNullPayload()): case s.addrReq <- s.MkMsg(CMDGetAddr, payload.NewNullPayload()):
// sent request // sent request
default: default:
// we have one in the queue already that is // we have one in the queue already that is
@ -316,7 +322,7 @@ func (s *Server) getVersionMsg() *Message {
s.chain.BlockHeight(), s.chain.BlockHeight(),
s.Relay, s.Relay,
) )
return NewMessage(s.Net, CMDVersion, payload) return s.MkMsg(CMDVersion, payload)
} }
// When a peer sends out his version we reply with verack after validating // When a peer sends out his version we reply with verack after validating
@ -339,7 +345,7 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {
} }
} }
s.lock.RUnlock() s.lock.RUnlock()
return p.SendVersionAck(NewMessage(s.Net, CMDVerack, nil)) return p.SendVersionAck(s.MkMsg(CMDVerack, nil))
} }
// handleHeadersCmd processes the headers received from its peer. // handleHeadersCmd processes the headers received from its peer.
@ -367,7 +373,7 @@ func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
// handlePing processes ping request. // handlePing processes ping request.
func (s *Server) handlePing(p Peer, ping *payload.Ping) error { func (s *Server) handlePing(p Peer, ping *payload.Ping) error {
return p.EnqueueMessage(NewMessage(s.Net, CMDPong, payload.NewPing(s.id, s.chain.BlockHeight()))) return p.EnqueueMessage(s.MkMsg(CMDPong, payload.NewPing(s.id, s.chain.BlockHeight())))
} }
// handlePing processes pong request. // handlePing processes pong request.
@ -401,7 +407,7 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error {
} }
} }
if len(reqHashes) > 0 { if len(reqHashes) > 0 {
msg := NewMessage(s.Net, CMDGetData, payload.NewInventory(inv.Type, reqHashes)) msg := s.MkMsg(CMDGetData, payload.NewInventory(inv.Type, reqHashes))
pkt, err := msg.Bytes() pkt, err := msg.Bytes()
if err != nil { if err != nil {
return err return err
@ -423,16 +429,16 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error {
case payload.TXType: case payload.TXType:
tx, _, err := s.chain.GetTransaction(hash) tx, _, err := s.chain.GetTransaction(hash)
if err == nil { if err == nil {
msg = NewMessage(s.Net, CMDTX, tx) msg = s.MkMsg(CMDTX, tx)
} }
case payload.BlockType: case payload.BlockType:
b, err := s.chain.GetBlock(hash) b, err := s.chain.GetBlock(hash)
if err == nil { if err == nil {
msg = NewMessage(s.Net, CMDBlock, b) msg = s.MkMsg(CMDBlock, b)
} }
case payload.ConsensusType: case payload.ConsensusType:
if cp := s.consensus.GetPayload(hash); cp != nil { if cp := s.consensus.GetPayload(hash); cp != nil {
msg = NewMessage(s.Net, CMDConsensus, cp) msg = s.MkMsg(CMDConsensus, cp)
} }
} }
if msg != nil { if msg != nil {
@ -475,7 +481,7 @@ func (s *Server) handleGetBlocksCmd(p Peer, gb *payload.GetBlocks) error {
return nil return nil
} }
payload := payload.NewInventory(payload.BlockType, blockHashes) payload := payload.NewInventory(payload.BlockType, blockHashes)
msg := NewMessage(s.Net, CMDInv, payload) msg := s.MkMsg(CMDInv, payload)
return p.EnqueueMessage(msg) return p.EnqueueMessage(msg)
} }
@ -505,7 +511,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlocks) error {
if len(resp.Hdrs) == 0 { if len(resp.Hdrs) == 0 {
return nil return nil
} }
msg := NewMessage(s.Net, CMDHeaders, &resp) msg := s.MkMsg(CMDHeaders, &resp)
return p.EnqueueMessage(msg) return p.EnqueueMessage(msg)
} }
@ -547,7 +553,7 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
netaddr, _ := net.ResolveTCPAddr("tcp", addr) netaddr, _ := net.ResolveTCPAddr("tcp", addr)
alist.Addrs[i] = payload.NewAddressAndTime(netaddr, ts) alist.Addrs[i] = payload.NewAddressAndTime(netaddr, ts)
} }
return p.EnqueueMessage(NewMessage(s.Net, CMDAddr, alist)) return p.EnqueueMessage(s.MkMsg(CMDAddr, alist))
} }
// requestHeaders sends a getheaders message to the peer. // requestHeaders sends a getheaders message to the peer.
@ -555,7 +561,7 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
func (s *Server) requestHeaders(p Peer) error { func (s *Server) requestHeaders(p Peer) error {
start := []util.Uint256{s.chain.CurrentHeaderHash()} start := []util.Uint256{s.chain.CurrentHeaderHash()}
payload := payload.NewGetBlocks(start, util.Uint256{}) payload := payload.NewGetBlocks(start, util.Uint256{})
return p.EnqueueMessage(NewMessage(s.Net, CMDGetHeaders, payload)) return p.EnqueueMessage(s.MkMsg(CMDGetHeaders, payload))
} }
// requestBlocks sends a getdata message to the peer // requestBlocks sends a getdata message to the peer
@ -574,7 +580,7 @@ func (s *Server) requestBlocks(p Peer) error {
} }
if len(hashes) > 0 { if len(hashes) > 0 {
payload := payload.NewInventory(payload.BlockType, hashes) payload := payload.NewInventory(payload.BlockType, hashes)
return p.EnqueueMessage(NewMessage(s.Net, CMDGetData, payload)) return p.EnqueueMessage(s.MkMsg(CMDGetData, payload))
} else if s.chain.HeaderHeight() < p.LastBlockIndex() { } else if s.chain.HeaderHeight() < p.LastBlockIndex() {
return s.requestHeaders(p) return s.requestHeaders(p)
} }
@ -675,7 +681,7 @@ func (s *Server) requestTx(hashes ...util.Uint256) {
func (s *Server) relayInventoryCmd(cmd CommandType, t payload.InventoryType, hashes ...util.Uint256) { func (s *Server) relayInventoryCmd(cmd CommandType, t payload.InventoryType, hashes ...util.Uint256) {
payload := payload.NewInventory(t, hashes) payload := payload.NewInventory(t, hashes)
msg := NewMessage(s.Net, cmd, payload) msg := s.MkMsg(cmd, payload)
for peer := range s.Peers() { for peer := range s.Peers() {
if !peer.Handshaked() || !peer.Version().Relay { if !peer.Handshaked() || !peer.Version().Relay {

View file

@ -356,7 +356,7 @@ func (p *TCPPeer) SendPing() error {
}) })
} }
p.lock.Unlock() p.lock.Unlock()
return p.EnqueueMessage(NewMessage(p.server.Net, CMDPing, payload.NewPing(p.server.id, p.server.chain.HeaderHeight()))) return p.EnqueueMessage(p.server.MkMsg(CMDPing, payload.NewPing(p.server.id, p.server.chain.HeaderHeight())))
} }
// HandlePong handles a pong message received from the peer and does appropriate // HandlePong handles a pong message received from the peer and does appropriate