diff --git a/pkg/network/server.go b/pkg/network/server.go index 082d3c519..3ac65741b 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -152,6 +152,12 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) *S return s } +// MkMsg creates a new message based on the server configured network and given +// parameters. +func (s *Server) MkMsg(cmd CommandType, p payload.Payload) *Message { + return NewMessage(s.Net, cmd, p) +} + // ID returns the servers ID. func (s *Server) ID() uint32 { return s.id @@ -198,7 +204,7 @@ func (s *Server) run() { } if s.discovery.PoolCount() < minPoolCount { select { - case s.addrReq <- NewMessage(s.Net, CMDGetAddr, payload.NewNullPayload()): + case s.addrReq <- s.MkMsg(CMDGetAddr, payload.NewNullPayload()): // sent request default: // we have one in the queue already that is @@ -316,7 +322,7 @@ func (s *Server) getVersionMsg() *Message { s.chain.BlockHeight(), s.Relay, ) - return NewMessage(s.Net, CMDVersion, payload) + return s.MkMsg(CMDVersion, payload) } // When a peer sends out his version we reply with verack after validating @@ -339,7 +345,7 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { } } s.lock.RUnlock() - return p.SendVersionAck(NewMessage(s.Net, CMDVerack, nil)) + return p.SendVersionAck(s.MkMsg(CMDVerack, nil)) } // handleHeadersCmd processes the headers received from its peer. @@ -367,7 +373,7 @@ func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { // handlePing processes ping request. func (s *Server) handlePing(p Peer, ping *payload.Ping) error { - return p.EnqueueMessage(NewMessage(s.Net, CMDPong, payload.NewPing(s.id, s.chain.BlockHeight()))) + return p.EnqueueMessage(s.MkMsg(CMDPong, payload.NewPing(s.id, s.chain.BlockHeight()))) } // handlePing processes pong request. @@ -401,7 +407,7 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error { } } if len(reqHashes) > 0 { - msg := NewMessage(s.Net, CMDGetData, payload.NewInventory(inv.Type, reqHashes)) + msg := s.MkMsg(CMDGetData, payload.NewInventory(inv.Type, reqHashes)) pkt, err := msg.Bytes() if err != nil { return err @@ -423,16 +429,16 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error { case payload.TXType: tx, _, err := s.chain.GetTransaction(hash) if err == nil { - msg = NewMessage(s.Net, CMDTX, tx) + msg = s.MkMsg(CMDTX, tx) } case payload.BlockType: b, err := s.chain.GetBlock(hash) if err == nil { - msg = NewMessage(s.Net, CMDBlock, b) + msg = s.MkMsg(CMDBlock, b) } case payload.ConsensusType: if cp := s.consensus.GetPayload(hash); cp != nil { - msg = NewMessage(s.Net, CMDConsensus, cp) + msg = s.MkMsg(CMDConsensus, cp) } } if msg != nil { @@ -475,7 +481,7 @@ func (s *Server) handleGetBlocksCmd(p Peer, gb *payload.GetBlocks) error { return nil } payload := payload.NewInventory(payload.BlockType, blockHashes) - msg := NewMessage(s.Net, CMDInv, payload) + msg := s.MkMsg(CMDInv, payload) return p.EnqueueMessage(msg) } @@ -505,7 +511,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlocks) error { if len(resp.Hdrs) == 0 { return nil } - msg := NewMessage(s.Net, CMDHeaders, &resp) + msg := s.MkMsg(CMDHeaders, &resp) return p.EnqueueMessage(msg) } @@ -547,7 +553,7 @@ func (s *Server) handleGetAddrCmd(p Peer) error { netaddr, _ := net.ResolveTCPAddr("tcp", addr) alist.Addrs[i] = payload.NewAddressAndTime(netaddr, ts) } - return p.EnqueueMessage(NewMessage(s.Net, CMDAddr, alist)) + return p.EnqueueMessage(s.MkMsg(CMDAddr, alist)) } // requestHeaders sends a getheaders message to the peer. @@ -555,7 +561,7 @@ func (s *Server) handleGetAddrCmd(p Peer) error { func (s *Server) requestHeaders(p Peer) error { start := []util.Uint256{s.chain.CurrentHeaderHash()} payload := payload.NewGetBlocks(start, util.Uint256{}) - return p.EnqueueMessage(NewMessage(s.Net, CMDGetHeaders, payload)) + return p.EnqueueMessage(s.MkMsg(CMDGetHeaders, payload)) } // requestBlocks sends a getdata message to the peer @@ -574,7 +580,7 @@ func (s *Server) requestBlocks(p Peer) error { } if len(hashes) > 0 { payload := payload.NewInventory(payload.BlockType, hashes) - return p.EnqueueMessage(NewMessage(s.Net, CMDGetData, payload)) + return p.EnqueueMessage(s.MkMsg(CMDGetData, payload)) } else if s.chain.HeaderHeight() < p.LastBlockIndex() { return s.requestHeaders(p) } @@ -675,7 +681,7 @@ func (s *Server) requestTx(hashes ...util.Uint256) { func (s *Server) relayInventoryCmd(cmd CommandType, t payload.InventoryType, hashes ...util.Uint256) { payload := payload.NewInventory(t, hashes) - msg := NewMessage(s.Net, cmd, payload) + msg := s.MkMsg(cmd, payload) for peer := range s.Peers() { if !peer.Handshaked() || !peer.Version().Relay { diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index b41726bbc..9d2964263 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -356,7 +356,7 @@ func (p *TCPPeer) SendPing() error { }) } p.lock.Unlock() - return p.EnqueueMessage(NewMessage(p.server.Net, CMDPing, payload.NewPing(p.server.id, p.server.chain.HeaderHeight()))) + return p.EnqueueMessage(p.server.MkMsg(CMDPing, payload.NewPing(p.server.id, p.server.chain.HeaderHeight()))) } // HandlePong handles a pong message received from the peer and does appropriate