diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 59c6fe2ac..a44b1a038 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -105,26 +105,7 @@ func (p *localPeer) Disconnect(err error) { p.server.unregister <- peerDrop{p, err} } -func (p *localPeer) EnqueueMessage(msg *Message) error { - b, err := msg.Bytes() - if err != nil { - return err - } - return p.EnqueueHPPacket(b) -} func (p *localPeer) BroadcastPacket(_ context.Context, m []byte) error { - return p.EnqueueHPPacket(m) -} -func (p *localPeer) EnqueueP2PMessage(msg *Message) error { - return p.EnqueueMessage(msg) -} -func (p *localPeer) EnqueueP2PPacket(m []byte) error { - return p.EnqueueHPPacket(m) -} -func (p *localPeer) BroadcastHPPacket(_ context.Context, m []byte) error { - return p.EnqueueHPPacket(m) -} -func (p *localPeer) EnqueueHPPacket(m []byte) error { msg := &Message{} r := io.NewBinReaderFromBuf(m) err := msg.Decode(r) @@ -133,6 +114,16 @@ func (p *localPeer) EnqueueHPPacket(m []byte) error { } return nil } +func (p *localPeer) EnqueueP2PMessage(msg *Message) error { + return p.EnqueueHPMessage(msg) +} +func (p *localPeer) BroadcastHPPacket(ctx context.Context, m []byte) error { + return p.BroadcastPacket(ctx, m) +} +func (p *localPeer) EnqueueHPMessage(msg *Message) error { + p.messageHandler(p.t, msg) + return nil +} func (p *localPeer) Version() *payload.Version { return p.version } @@ -148,11 +139,11 @@ func (p *localPeer) SendVersion() error { if err != nil { return err } - _ = p.EnqueueMessage(m) + _ = p.EnqueueHPMessage(m) return nil } func (p *localPeer) SendVersionAck(m *Message) error { - _ = p.EnqueueMessage(m) + _ = p.EnqueueHPMessage(m) return nil } func (p *localPeer) HandleVersionAck() error { diff --git a/pkg/network/peer.go b/pkg/network/peer.go index 78820ab56..9854165d5 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -32,24 +32,17 @@ type Peer interface { // queue. BroadcastHPPacket(context.Context, []byte) error - // EnqueueP2PMessage is a temporary wrapper that sends a message via - // EnqueueP2PPacket if there is no error in serializing it. + // EnqueueP2PMessage is a blocking packet enqueuer, it doesn't return until + // it puts the given message into the queue. It returns an error if the peer + // has not yet completed handshaking. This queue is intended to be used for + // unicast peer to peer communication that is more important than broadcasts + // (handled by BroadcastPacket) but less important than high-priority + // messages (handled by EnqueueHPMessage). EnqueueP2PMessage(*Message) error - // EnqueueP2PPacket is a blocking packet enqueuer, it doesn't return until - // it puts the given packet into the queue. It accepts a slice of bytes that - // can be shared with other queues (so that message marshalling can be - // done once for all peers). It returns an error if the peer has not yet - // completed handshaking. This queue is intended to be used for unicast - // peer to peer communication that is more important than broadcasts - // (handled by BroadcastPacket) but less important than high-priority - // messages (handled by EnqueueHPPacket and BroadcastHPPacket). - EnqueueP2PPacket([]byte) error - - // EnqueueHPPacket is a blocking high priority packet enqueuer, it - // doesn't return until it puts the given packet into the high-priority + // EnqueueHPMessage is similar to EnqueueP2PMessage, but uses a high-priority // queue. - EnqueueHPPacket([]byte) error + EnqueueHPMessage(*Message) error Version() *payload.Version LastBlockIndex() uint32 Handshaked() bool diff --git a/pkg/network/server.go b/pkg/network/server.go index 1663b051f..44943b484 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -748,14 +748,10 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error { } if len(reqHashes) > 0 { msg := NewMessage(CMDGetData, payload.NewInventory(inv.Type, reqHashes)) - pkt, err := msg.Bytes() - if err != nil { - return err - } if inv.Type == payload.ExtensibleType { - return p.EnqueueHPPacket(pkt) + return p.EnqueueHPMessage(msg) } - return p.EnqueueP2PPacket(pkt) + return p.EnqueueP2PMessage(msg) } return nil } @@ -812,13 +808,11 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error { } } if msg != nil { - pkt, err := msg.Bytes() - if err == nil { - if inv.Type == payload.ExtensibleType { - err = p.EnqueueHPPacket(pkt) - } else { - err = p.EnqueueP2PPacket(pkt) - } + var err error + if inv.Type == payload.ExtensibleType { + err = p.EnqueueHPMessage(msg) + } else { + err = p.EnqueueP2PMessage(msg) } if err != nil { return err diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 3019d6780..7b0982390 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -132,20 +132,14 @@ func (p *TCPPeer) putMsgIntoQueue(queue chan<- []byte, msg *Message) error { return p.putPacketIntoQueue(queue, b) } -// EnqueueP2PPacket implements the Peer interface. -func (p *TCPPeer) EnqueueP2PPacket(msg []byte) error { - return p.putPacketIntoQueue(p.p2pSendQ, msg) -} - // EnqueueP2PMessage implements the Peer interface. func (p *TCPPeer) EnqueueP2PMessage(msg *Message) error { return p.putMsgIntoQueue(p.p2pSendQ, msg) } -// EnqueueHPPacket implements the Peer interface. It the peer is not yet -// handshaked it's a noop. -func (p *TCPPeer) EnqueueHPPacket(msg []byte) error { - return p.putPacketIntoQueue(p.hpSendQ, msg) +// EnqueueHPMessage implements the Peer interface. +func (p *TCPPeer) EnqueueHPMessage(msg *Message) error { + return p.putMsgIntoQueue(p.hpSendQ, msg) } func (p *TCPPeer) writeMsg(msg *Message) error {