network: deduplicate TCPPeer code a bit

context.Background() is never canceled and has no deadline, so we can avoid
duplicating some code.
This commit is contained in:
Roman Khimov 2022-10-12 15:43:31 +03:00
parent 104da8caff
commit 137f2cb192

View file

@ -81,9 +81,9 @@ func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer {
} }
} }
// putBroadcastPacketIntoQueue puts the given message into the given queue if // putPacketIntoQueue puts the given message into the given queue if
// the peer has done handshaking using the given context. // the peer has done handshaking using the given context.
func (p *TCPPeer) putBroadcastPacketIntoQueue(ctx context.Context, queue chan<- []byte, msg []byte) error { func (p *TCPPeer) putPacketIntoQueue(ctx context.Context, queue chan<- []byte, msg []byte) error {
if !p.Handshaked() { if !p.Handshaked() {
return errStateMismatch return errStateMismatch
} }
@ -97,29 +97,15 @@ func (p *TCPPeer) putBroadcastPacketIntoQueue(ctx context.Context, queue chan<-
return nil return nil
} }
// putPacketIntoQueue puts the given message into the given queue if the peer has
// done handshaking.
func (p *TCPPeer) putPacketIntoQueue(queue chan<- []byte, msg []byte) error {
if !p.Handshaked() {
return errStateMismatch
}
select {
case queue <- msg:
case <-p.done:
return errGone
}
return nil
}
// BroadcastPacket implements the Peer interface. // BroadcastPacket implements the Peer interface.
func (p *TCPPeer) BroadcastPacket(ctx context.Context, msg []byte) error { func (p *TCPPeer) BroadcastPacket(ctx context.Context, msg []byte) error {
return p.putBroadcastPacketIntoQueue(ctx, p.sendQ, msg) return p.putPacketIntoQueue(ctx, p.sendQ, msg)
} }
// BroadcastHPPacket implements the Peer interface. It the peer is not yet // BroadcastHPPacket implements the Peer interface. It the peer is not yet
// handshaked it's a noop. // handshaked it's a noop.
func (p *TCPPeer) BroadcastHPPacket(ctx context.Context, msg []byte) error { func (p *TCPPeer) BroadcastHPPacket(ctx context.Context, msg []byte) error {
return p.putBroadcastPacketIntoQueue(ctx, p.hpSendQ, msg) return p.putPacketIntoQueue(ctx, p.hpSendQ, msg)
} }
// putMessageIntoQueue serializes the given Message and puts it into given queue if // putMessageIntoQueue serializes the given Message and puts it into given queue if
@ -129,7 +115,7 @@ func (p *TCPPeer) putMsgIntoQueue(queue chan<- []byte, msg *Message) error {
if err != nil { if err != nil {
return err return err
} }
return p.putPacketIntoQueue(queue, b) return p.putPacketIntoQueue(context.Background(), queue, b)
} }
// EnqueueP2PMessage implements the Peer interface. // EnqueueP2PMessage implements the Peer interface.