neoneo-go/pkg/network/tcp_peer.go

486 lines
12 KiB
Go
Raw Normal View History

package network
import (
"errors"
"fmt"
"net"
"strconv"
"sync"
"time"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network/capability"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"go.uber.org/atomic"
"go.uber.org/zap"
)
type handShakeStage uint8
const (
versionSent handShakeStage = 1 << iota
versionReceived
verAckSent
verAckReceived
requestQueueSize = 32
p2pMsgQueueSize = 16
hpRequestQueueSize = 4
)
var (
network: fix networking stalls caused by stale peers We can leak sending goroutines and stall broadcasts because of already gone peers that happened to be cached by some s.Peers() user (more than 800 of these can be seen in nodoka log along with (*Server).run blocking on CMDGetAddr send): Feb 10 16:35:15 nodoka neo-go[1563]: goroutine 41 [chan send, 3320 minutes]: Feb 10 16:35:15 nodoka neo-go[1563]: github.com/CityOfZion/neo-go/pkg/network.(*TCPPeer).putPacketIntoQueue(...) Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_peer.go:81 Feb 10 16:35:15 nodoka neo-go[1563]: github.com/CityOfZion/neo-go/pkg/network.(*TCPPeer).EnqueueHPPacket(0xc0083d57a0, 0xc017206100, 0x18, 0x40, 0x136a240, 0xc018ef9720) Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_peer.go:119 +0x98 Feb 10 16:35:15 nodoka neo-go[1563]: github.com/CityOfZion/neo-go/pkg/network.(*Server).iteratePeersWithSendMsg(0xc0000ca000, 0xc0001848a0, 0xcb4550, 0x0) Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:720 +0x12a Feb 10 16:35:15 nodoka neo-go[1563]: github.com/CityOfZion/neo-go/pkg/network.(*Server).broadcastHPMessage(...) Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:731 Feb 10 16:35:15 nodoka neo-go[1563]: github.com/CityOfZion/neo-go/pkg/network.(*Server).run(0xc0000ca000) Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:203 +0xee4 Feb 10 16:35:15 nodoka neo-go[1563]: github.com/CityOfZion/neo-go/pkg/network.(*Server).Start(0xc0000ca000, 0xc000072c60) Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:173 +0x2ec Feb 10 16:35:15 nodoka neo-go[1563]: created by github.com/CityOfZion/neo-go/cli/server.startServer Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/cli/server/server.go:331 +0x476
2020-02-10 14:21:56 +00:00
errGone = errors.New("the peer is gone already")
errBusy = errors.New("peer is busy")
errStateMismatch = errors.New("tried to send protocol message before handshake completed")
errPingPong = errors.New("ping/pong timeout")
errUnexpectedPong = errors.New("pong message wasn't expected")
)
// TCPPeer represents a connected remote node in the
// network over TCP.
type TCPPeer struct {
// underlying TCP connection.
conn net.Conn
// The server this peer belongs to.
server *Server
// The version of the peer.
version *payload.Version
// Index of the last block.
lastBlockIndex uint32
lock sync.RWMutex
finale sync.Once
handShake handShakeStage
isFullNode bool
done chan struct{}
sendQ chan []byte
p2pSendQ chan []byte
hpSendQ chan []byte
wg sync.WaitGroup
// track outstanding getaddr requests.
getAddrSent atomic.Int32
// number of sent pings.
pingSent int
pingTimer *time.Timer
}
// NewTCPPeer returns a TCPPeer structure based on the given connection.
func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer {
return &TCPPeer{
conn: conn,
server: s,
done: make(chan struct{}),
sendQ: make(chan []byte, requestQueueSize),
p2pSendQ: make(chan []byte, p2pMsgQueueSize),
hpSendQ: make(chan []byte, hpRequestQueueSize),
}
}
// putPacketIntoQueue puts given message into the given queue if the peer has
// done handshaking.
func (p *TCPPeer) putPacketIntoQueue(queue chan<- []byte, block bool, msg []byte) error {
if !p.Handshaked() {
return errStateMismatch
}
if block {
select {
case queue <- msg:
case <-p.done:
return errGone
}
} else {
select {
case queue <- msg:
case <-p.done:
return errGone
default:
return errBusy
}
network: fix networking stalls caused by stale peers We can leak sending goroutines and stall broadcasts because of already gone peers that happened to be cached by some s.Peers() user (more than 800 of these can be seen in nodoka log along with (*Server).run blocking on CMDGetAddr send): Feb 10 16:35:15 nodoka neo-go[1563]: goroutine 41 [chan send, 3320 minutes]: Feb 10 16:35:15 nodoka neo-go[1563]: github.com/CityOfZion/neo-go/pkg/network.(*TCPPeer).putPacketIntoQueue(...) Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_peer.go:81 Feb 10 16:35:15 nodoka neo-go[1563]: github.com/CityOfZion/neo-go/pkg/network.(*TCPPeer).EnqueueHPPacket(0xc0083d57a0, 0xc017206100, 0x18, 0x40, 0x136a240, 0xc018ef9720) Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_peer.go:119 +0x98 Feb 10 16:35:15 nodoka neo-go[1563]: github.com/CityOfZion/neo-go/pkg/network.(*Server).iteratePeersWithSendMsg(0xc0000ca000, 0xc0001848a0, 0xcb4550, 0x0) Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:720 +0x12a Feb 10 16:35:15 nodoka neo-go[1563]: github.com/CityOfZion/neo-go/pkg/network.(*Server).broadcastHPMessage(...) Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:731 Feb 10 16:35:15 nodoka neo-go[1563]: github.com/CityOfZion/neo-go/pkg/network.(*Server).run(0xc0000ca000) Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:203 +0xee4 Feb 10 16:35:15 nodoka neo-go[1563]: github.com/CityOfZion/neo-go/pkg/network.(*Server).Start(0xc0000ca000, 0xc000072c60) Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:173 +0x2ec Feb 10 16:35:15 nodoka neo-go[1563]: created by github.com/CityOfZion/neo-go/cli/server.startServer Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/cli/server/server.go:331 +0x476
2020-02-10 14:21:56 +00:00
}
return nil
}
// EnqueuePacket implements the Peer interface.
func (p *TCPPeer) EnqueuePacket(block bool, msg []byte) error {
return p.putPacketIntoQueue(p.sendQ, block, msg)
}
// putMessageIntoQueue serializes given Message and puts it into given queue if
// the peer has done handshaking.
func (p *TCPPeer) putMsgIntoQueue(queue chan<- []byte, msg *Message) error {
b, err := msg.Bytes()
if err != nil {
return err
}
return p.putPacketIntoQueue(queue, true, b)
}
// EnqueueMessage is a temporary wrapper that sends a message via
// EnqueuePacket if there is no error in serializing it.
func (p *TCPPeer) EnqueueMessage(msg *Message) error {
return p.putMsgIntoQueue(p.sendQ, msg)
}
// EnqueueP2PPacket implements the Peer interface.
func (p *TCPPeer) EnqueueP2PPacket(msg []byte) error {
return p.putPacketIntoQueue(p.p2pSendQ, true, msg)
}
// EnqueueP2PMessage implements the Peer interface.
func (p *TCPPeer) EnqueueP2PMessage(msg *Message) error {
return p.putMsgIntoQueue(p.p2pSendQ, msg)
}
// EnqueueHPPacket implements the Peer interface. It the peer is not yet
// handshaked it's a noop.
func (p *TCPPeer) EnqueueHPPacket(block bool, msg []byte) error {
return p.putPacketIntoQueue(p.hpSendQ, block, msg)
}
func (p *TCPPeer) writeMsg(msg *Message) error {
b, err := msg.Bytes()
if err != nil {
return err
}
_, err = p.conn.Write(b)
return err
}
// handleConn handles the read side of the connection, it should be started as
// a goroutine right after the new peer setup.
func (p *TCPPeer) handleConn() {
var err error
p.server.register <- p
go p.handleQueues()
// When a new peer is connected we send out our version immediately.
err = p.SendVersion()
if err == nil {
r := io.NewBinReaderFromIO(p.conn)
for {
msg := &Message{Network: p.server.network, StateRootInHeader: p.server.stateRootInHeader}
err = msg.Decode(r)
if err == payload.ErrTooManyHeaders {
p.server.log.Warn("not all headers were processed")
r.Err = nil
} else if err != nil {
break
}
if err = p.server.handleMessage(p, msg); err != nil {
if p.Handshaked() {
err = fmt.Errorf("handling %s message: %w", msg.Command.String(), err)
}
break
}
}
}
p.Disconnect(err)
}
// handleQueues is a goroutine that is started automatically to handle
// send queues.
func (p *TCPPeer) handleQueues() {
var err error
// p2psend queue shares its time with send queue in around
// ((p2pSkipDivisor - 1) * 2 + 1)/1 ratio, ratio because the third
// select can still choose p2psend over send.
var p2pSkipCounter uint32
const p2pSkipDivisor = 4
for {
var msg []byte
// This one is to give priority to the hp queue
select {
case <-p.done:
return
case msg = <-p.hpSendQ:
default:
}
// Skip this select every p2pSkipDivisor iteration.
if msg == nil && p2pSkipCounter%p2pSkipDivisor != 0 {
// Then look at the p2p queue.
select {
case <-p.done:
return
case msg = <-p.hpSendQ:
case msg = <-p.p2pSendQ:
default:
}
}
// If there is no message in HP or P2P queues, block until one
// appears in any of the queues.
if msg == nil {
select {
case <-p.done:
return
case msg = <-p.hpSendQ:
case msg = <-p.p2pSendQ:
case msg = <-p.sendQ:
}
}
_, err = p.conn.Write(msg)
if err != nil {
break
}
p2pSkipCounter++
}
p.Disconnect(err)
}
// StartProtocol starts a long running background loop that interacts
// every ProtoTickInterval with the peer. It's only good to run after the
// handshake.
func (p *TCPPeer) StartProtocol() {
var err error
p.server.log.Info("started protocol",
zap.Stringer("addr", p.RemoteAddr()),
zap.ByteString("userAgent", p.Version().UserAgent),
zap.Uint32("startHeight", p.lastBlockIndex),
zap.Uint32("id", p.Version().Nonce))
p.server.discovery.RegisterGoodAddr(p.PeerAddr().String(), p.version.Capabilities)
if p.server.chain.BlockHeight() < p.LastBlockIndex() {
err = p.server.requestBlocks(p)
if err != nil {
p.Disconnect(err)
return
}
}
timer := time.NewTimer(p.server.ProtoTickInterval)
for {
select {
case <-p.done:
return
case <-timer.C:
// Try to sync in headers and block with the peer if his block height is higher then ours.
if p.LastBlockIndex() > p.server.chain.BlockHeight() {
err = p.server.requestBlocks(p)
}
if err == nil {
timer.Reset(p.server.ProtoTickInterval)
}
}
if err != nil {
timer.Stop()
p.Disconnect(err)
return
}
}
}
// Handshaked returns status of the handshake, whether it's completed or not.
func (p *TCPPeer) Handshaked() bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.handshaked()
}
// handshaked is internal unlocked version of Handshaked().
func (p *TCPPeer) handshaked() bool {
return p.handShake == (verAckReceived | verAckSent | versionReceived | versionSent)
}
// IsFullNode returns whether the node has full capability or TCP/WS only.
func (p *TCPPeer) IsFullNode() bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.handshaked() && p.isFullNode
}
// SendVersion checks for the handshake state and sends a message to the peer.
func (p *TCPPeer) SendVersion() error {
msg, err := p.server.getVersionMsg()
if err != nil {
return err
}
p.lock.Lock()
defer p.lock.Unlock()
if p.handShake&versionSent != 0 {
return errors.New("invalid handshake: already sent Version")
}
err = p.writeMsg(msg)
if err == nil {
p.handShake |= versionSent
}
return err
}
// HandleVersion checks for the handshake state and version message contents.
func (p *TCPPeer) HandleVersion(version *payload.Version) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.handShake&versionReceived != 0 {
return errors.New("invalid handshake: already received Version")
}
p.version = version
for _, cap := range version.Capabilities {
if cap.Type == capability.FullNode {
p.isFullNode = true
p.lastBlockIndex = cap.Data.(*capability.Node).StartHeight
break
}
}
p.handShake |= versionReceived
return nil
}
// SendVersionAck checks for the handshake state and sends a message to the peer.
func (p *TCPPeer) SendVersionAck(msg *Message) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.handShake&versionReceived == 0 {
return errors.New("invalid handshake: tried to send VersionAck, but no version received yet")
}
if p.handShake&versionSent == 0 {
return errors.New("invalid handshake: tried to send VersionAck, but didn't send Version yet")
}
if p.handShake&verAckSent != 0 {
return errors.New("invalid handshake: already sent VersionAck")
}
err := p.writeMsg(msg)
if err == nil {
p.handShake |= verAckSent
}
return err
}
// HandleVersionAck checks handshake sequence correctness when VerAck message
// is received.
func (p *TCPPeer) HandleVersionAck() error {
p.lock.Lock()
defer p.lock.Unlock()
if p.handShake&versionSent == 0 {
return errors.New("invalid handshake: received VersionAck, but no version sent yet")
}
if p.handShake&versionReceived == 0 {
return errors.New("invalid handshake: received VersionAck, but no version received yet")
}
if p.handShake&verAckReceived != 0 {
return errors.New("invalid handshake: already received VersionAck")
}
p.handShake |= verAckReceived
return nil
}
// RemoteAddr implements the Peer interface.
func (p *TCPPeer) RemoteAddr() net.Addr {
return p.conn.RemoteAddr()
}
// PeerAddr implements the Peer interface.
func (p *TCPPeer) PeerAddr() net.Addr {
remote := p.conn.RemoteAddr()
// The network can be non-tcp in unit tests.
if p.version == nil || remote.Network() != "tcp" {
return p.RemoteAddr()
}
host, _, err := net.SplitHostPort(remote.String())
if err != nil {
return p.RemoteAddr()
}
var port uint16
for _, cap := range p.version.Capabilities {
if cap.Type == capability.TCPServer {
port = cap.Data.(*capability.Server).Port
}
}
if port == 0 {
return p.RemoteAddr()
}
addrString := net.JoinHostPort(host, strconv.Itoa(int(port)))
tcpAddr, err := net.ResolveTCPAddr("tcp", addrString)
if err != nil {
return p.RemoteAddr()
}
return tcpAddr
}
// Disconnect will fill the peer's done channel with the given error.
func (p *TCPPeer) Disconnect(err error) {
p.finale.Do(func() {
close(p.done)
network: change the disconnect procedure We can still lock the (*Server).run with dead peers: Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: goroutine 40 [select, 871 minutes]: Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*TCPPeer).putPacketIntoQueue(0xc030ab5320, 0xc02f251f20, 0xc00af0dcc0, 0x18, 0x40, 0x100000000000000, 0xffffffffffffffff) Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_peer.go:82 +0xf4 Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*TCPPeer).EnqueueHPPacket(0xc030ab5320, 0xc00af0dcc0, 0x18, 0x40, 0x1367240, 0xc03090ef98) Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_peer.go:124 +0x52 Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*Server).iteratePeersWithSendMsg(0xc0000ca000, 0xc00af35800, 0xcb2a58, 0x0) Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:720 +0x12a Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*Server).broadcastHPMessage(...) Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:731 Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*Server).run(0xc0000ca000) Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:203 +0xee4 Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*Server).Start(0xc0000ca000, 0xc000072ba0) Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:173 +0x2ec Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: created by github.com/CityOfZion/neo-go/cli/server.startServer Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/cli/server/server.go:331 +0x476 ... Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: goroutine 2199 [chan send, 870 minutes]: Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*TCPPeer).Disconnect.func1() Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_peer.go:366 +0x85 Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: sync.(*Once).Do(0xc030ab403c, 0xc02f262788) Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/usr/local/go/src/sync/once.go:44 +0xb3 Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*TCPPeer).Disconnect(0xc030ab4000, 0xd92440, 0xc000065a00) Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_peer.go:365 +0x6d Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*TCPPeer).SendPing.func1() Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_peer.go:394 +0x42 Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: created by time.goFunc Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/usr/local/go/src/time/sleep.go:169 +0x44 ... Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: goroutine 3448 [chan send, 854 minutes]: Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*TCPPeer).handleConn(0xc01ed203f0) Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_peer.go:143 +0x6c Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: created by github.com/CityOfZion/neo-go/pkg/network.(*TCPTransport).Accept Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_transport.go:62 +0x44c ... The problem is that the select in putPacketIntoQueue() only works the way it was intended to after the `close(p.done)`, but that happens only after successful unregistration request send. Thus, do disconnects the other way around, first unblock queueing and exit goroutines, then destroy the connection (if it wasn't previously destroyed) and only after that signal to the Server.
2020-02-13 13:24:46 +00:00
p.conn.Close()
p.server.unregister <- peerDrop{p, err}
})
}
// Version implements the Peer interface.
func (p *TCPPeer) Version() *payload.Version {
return p.version
}
// LastBlockIndex returns last block index.
func (p *TCPPeer) LastBlockIndex() uint32 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.lastBlockIndex
}
// SendPing sends a ping message to the peer and does appropriate accounting of
// outstanding pings and timeouts.
func (p *TCPPeer) SendPing(msg *Message) error {
if !p.Handshaked() {
return errStateMismatch
}
p.lock.Lock()
p.pingSent++
if p.pingTimer == nil {
p.pingTimer = time.AfterFunc(p.server.PingTimeout, func() {
p.Disconnect(errPingPong)
})
}
p.lock.Unlock()
return p.EnqueueMessage(msg)
}
// HandlePing handles a ping message received from the peer.
func (p *TCPPeer) HandlePing(ping *payload.Ping) error {
p.lock.Lock()
defer p.lock.Unlock()
p.lastBlockIndex = ping.LastBlockIndex
return nil
}
// HandlePong handles a pong message received from the peer and does appropriate
// accounting of outstanding pings and timeouts.
func (p *TCPPeer) HandlePong(pong *payload.Ping) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.pingTimer != nil && !p.pingTimer.Stop() {
return errPingPong
}
p.pingTimer = nil
p.pingSent--
if p.pingSent < 0 {
return errUnexpectedPong
}
p.lastBlockIndex = pong.LastBlockIndex
return nil
}
// AddGetAddrSent increments internal outstanding getaddr requests counter. The
// peer can only send then one addr reply per getaddr request.
func (p *TCPPeer) AddGetAddrSent() {
p.getAddrSent.Inc()
}
// CanProcessAddr decrements internal outstanding getaddr requests counter and
// answers whether the addr command from the peer can be safely processed.
func (p *TCPPeer) CanProcessAddr() bool {
v := p.getAddrSent.Dec()
return v >= 0
}