network: broadcast messages, enqueue packets

Drop EnqueueP2PPacket, replace EnqueueHPPacket with EnqueueHPMessage. We use
Enqueue* when we have a specific per-peer message, it makes zero sense
duplicating serialization code for it (unlike Broadcast*).
This commit is contained in:
Roman Khimov 2022-10-12 15:39:20 +03:00
parent d5f2ad86a1
commit 104da8caff
4 changed files with 30 additions and 58 deletions

View file

@ -105,26 +105,7 @@ func (p *localPeer) Disconnect(err error) {
p.server.unregister <- peerDrop{p, err} p.server.unregister <- peerDrop{p, err}
} }
func (p *localPeer) EnqueueMessage(msg *Message) error {
b, err := msg.Bytes()
if err != nil {
return err
}
return p.EnqueueHPPacket(b)
}
func (p *localPeer) BroadcastPacket(_ context.Context, m []byte) error { func (p *localPeer) BroadcastPacket(_ context.Context, m []byte) error {
return p.EnqueueHPPacket(m)
}
func (p *localPeer) EnqueueP2PMessage(msg *Message) error {
return p.EnqueueMessage(msg)
}
func (p *localPeer) EnqueueP2PPacket(m []byte) error {
return p.EnqueueHPPacket(m)
}
func (p *localPeer) BroadcastHPPacket(_ context.Context, m []byte) error {
return p.EnqueueHPPacket(m)
}
func (p *localPeer) EnqueueHPPacket(m []byte) error {
msg := &Message{} msg := &Message{}
r := io.NewBinReaderFromBuf(m) r := io.NewBinReaderFromBuf(m)
err := msg.Decode(r) err := msg.Decode(r)
@ -133,6 +114,16 @@ func (p *localPeer) EnqueueHPPacket(m []byte) error {
} }
return nil return nil
} }
func (p *localPeer) EnqueueP2PMessage(msg *Message) error {
return p.EnqueueHPMessage(msg)
}
func (p *localPeer) BroadcastHPPacket(ctx context.Context, m []byte) error {
return p.BroadcastPacket(ctx, m)
}
func (p *localPeer) EnqueueHPMessage(msg *Message) error {
p.messageHandler(p.t, msg)
return nil
}
func (p *localPeer) Version() *payload.Version { func (p *localPeer) Version() *payload.Version {
return p.version return p.version
} }
@ -148,11 +139,11 @@ func (p *localPeer) SendVersion() error {
if err != nil { if err != nil {
return err return err
} }
_ = p.EnqueueMessage(m) _ = p.EnqueueHPMessage(m)
return nil return nil
} }
func (p *localPeer) SendVersionAck(m *Message) error { func (p *localPeer) SendVersionAck(m *Message) error {
_ = p.EnqueueMessage(m) _ = p.EnqueueHPMessage(m)
return nil return nil
} }
func (p *localPeer) HandleVersionAck() error { func (p *localPeer) HandleVersionAck() error {

View file

@ -32,24 +32,17 @@ type Peer interface {
// queue. // queue.
BroadcastHPPacket(context.Context, []byte) error BroadcastHPPacket(context.Context, []byte) error
// EnqueueP2PMessage is a temporary wrapper that sends a message via // EnqueueP2PMessage is a blocking packet enqueuer, it doesn't return until
// EnqueueP2PPacket if there is no error in serializing it. // it puts the given message into the queue. It returns an error if the peer
// has not yet completed handshaking. This queue is intended to be used for
// unicast peer to peer communication that is more important than broadcasts
// (handled by BroadcastPacket) but less important than high-priority
// messages (handled by EnqueueHPMessage).
EnqueueP2PMessage(*Message) error EnqueueP2PMessage(*Message) error
// EnqueueP2PPacket is a blocking packet enqueuer, it doesn't return until // EnqueueHPMessage is similar to EnqueueP2PMessage, but uses a high-priority
// it puts the given packet into the queue. It accepts a slice of bytes that
// can be shared with other queues (so that message marshalling can be
// done once for all peers). It returns an error if the peer has not yet
// completed handshaking. This queue is intended to be used for unicast
// peer to peer communication that is more important than broadcasts
// (handled by BroadcastPacket) but less important than high-priority
// messages (handled by EnqueueHPPacket and BroadcastHPPacket).
EnqueueP2PPacket([]byte) error
// EnqueueHPPacket is a blocking high priority packet enqueuer, it
// doesn't return until it puts the given packet into the high-priority
// queue. // queue.
EnqueueHPPacket([]byte) error EnqueueHPMessage(*Message) error
Version() *payload.Version Version() *payload.Version
LastBlockIndex() uint32 LastBlockIndex() uint32
Handshaked() bool Handshaked() bool

View file

@ -748,14 +748,10 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error {
} }
if len(reqHashes) > 0 { if len(reqHashes) > 0 {
msg := NewMessage(CMDGetData, payload.NewInventory(inv.Type, reqHashes)) msg := NewMessage(CMDGetData, payload.NewInventory(inv.Type, reqHashes))
pkt, err := msg.Bytes()
if err != nil {
return err
}
if inv.Type == payload.ExtensibleType { if inv.Type == payload.ExtensibleType {
return p.EnqueueHPPacket(pkt) return p.EnqueueHPMessage(msg)
} }
return p.EnqueueP2PPacket(pkt) return p.EnqueueP2PMessage(msg)
} }
return nil return nil
} }
@ -812,13 +808,11 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error {
} }
} }
if msg != nil { if msg != nil {
pkt, err := msg.Bytes() var err error
if err == nil { if inv.Type == payload.ExtensibleType {
if inv.Type == payload.ExtensibleType { err = p.EnqueueHPMessage(msg)
err = p.EnqueueHPPacket(pkt) } else {
} else { err = p.EnqueueP2PMessage(msg)
err = p.EnqueueP2PPacket(pkt)
}
} }
if err != nil { if err != nil {
return err return err

View file

@ -132,20 +132,14 @@ func (p *TCPPeer) putMsgIntoQueue(queue chan<- []byte, msg *Message) error {
return p.putPacketIntoQueue(queue, b) return p.putPacketIntoQueue(queue, b)
} }
// EnqueueP2PPacket implements the Peer interface.
func (p *TCPPeer) EnqueueP2PPacket(msg []byte) error {
return p.putPacketIntoQueue(p.p2pSendQ, msg)
}
// EnqueueP2PMessage implements the Peer interface. // EnqueueP2PMessage implements the Peer interface.
func (p *TCPPeer) EnqueueP2PMessage(msg *Message) error { func (p *TCPPeer) EnqueueP2PMessage(msg *Message) error {
return p.putMsgIntoQueue(p.p2pSendQ, msg) return p.putMsgIntoQueue(p.p2pSendQ, msg)
} }
// EnqueueHPPacket implements the Peer interface. It the peer is not yet // EnqueueHPMessage implements the Peer interface.
// handshaked it's a noop. func (p *TCPPeer) EnqueueHPMessage(msg *Message) error {
func (p *TCPPeer) EnqueueHPPacket(msg []byte) error { return p.putMsgIntoQueue(p.hpSendQ, msg)
return p.putPacketIntoQueue(p.hpSendQ, msg)
} }
func (p *TCPPeer) writeMsg(msg *Message) error { func (p *TCPPeer) writeMsg(msg *Message) error {