2018-03-09 15:55:25 +00:00
|
|
|
package network
|
|
|
|
|
|
|
|
import (
|
network: rework broadcast logic
We have a number of queues for different purposes:
* regular broadcast queue
* direct p2p queue
* high-priority queue
And two basic egress scenarios:
* direct p2p messages (replies to requests in Server's handle* methods)
* broadcasted messages
Low priority broadcasted messages:
* transaction inventories
* block inventories
* notary inventories
* non-consensus extensibles
High-priority broadcasted messages:
* consensus extensibles
* getdata transaction requests from consensus process
* getaddr requests
P2P messages are a bit more complicated, most of the time they use p2p queue,
but extensible message requests/replies use HP queue.
Server's handle* code is run from Peer's handleIncoming, every peer has this
thread that handles incoming messages. When working with the peer it's
important to reply to requests and blocking this thread until we send (queue)
a reply is fine, if the peer is slow we just won't get anything new from
it. The queue used is irrelevant wrt this issue.
Broadcasted messages are radically different, we want them to be delivered to
many peers, but we don't care about specific ones. If it's delivered to 2/3 of
the peers we're fine, if it's delivered to more of them --- it's not an
issue. But doing this fairly is not an easy thing, current code tries performing
unblocked sends and if this doesn't yield enough results it then blocks (but
has a timeout, we can't wait indefinitely). But it does so in sequential
manner, once the peer is chosen the code will wait for it (and only it) until
timeout happens.
What can be done instead is an attempt to push the message to all of the peers
simultaneously (or close to that). If they all deliver --- OK, if some block
and wait then we can wait until _any_ of them pushes the message through (or
global timeout happens, we still can't wait forever). If we have enough
deliveries then we can cancel pending ones and it's again not an error if
these canceled threads still do their job.
This makes the system more dynamic and adds some substantial processing
overhead, but it's a networking code, any of this overhead is much lower than
the actual packet delivery time. It also allows to spread the load more
fairly, if there is any spare queue it'll get the packet and release the
broadcaster. On the next broadcast iteration another peer is more likely to be
chosen just because it didn't get a message previously (and had some time to
deliver already queued messages).
It works perfectly in tests, with optimal networking conditions we have much
better block times and TPS increases by 5-25%% depending on the scenario.
I'd go as far as to say that it fixes the original problem of #2678, because
in this particular scenario we have empty queues in ~100% of the cases and
this new logic will likely lead to 100% fan out in this case (cancelation just
won't happen fast enough). But when the load grows and there is some waiting
in the queue it will optimize out the slowest links.
2022-10-10 19:48:06 +00:00
|
|
|
"context"
|
2019-09-13 12:43:22 +00:00
|
|
|
"errors"
|
2020-01-28 13:38:39 +00:00
|
|
|
"fmt"
|
2018-03-09 15:55:25 +00:00
|
|
|
"net"
|
2019-11-06 07:55:21 +00:00
|
|
|
"strconv"
|
2018-03-10 12:04:06 +00:00
|
|
|
"sync"
|
2020-01-15 14:03:42 +00:00
|
|
|
"time"
|
2018-03-09 15:55:25 +00:00
|
|
|
|
2020-03-03 14:21:42 +00:00
|
|
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
2020-05-22 09:17:17 +00:00
|
|
|
"github.com/nspcc-dev/neo-go/pkg/network/capability"
|
2020-03-03 14:21:42 +00:00
|
|
|
"github.com/nspcc-dev/neo-go/pkg/network/payload"
|
2020-11-25 10:34:38 +00:00
|
|
|
"go.uber.org/atomic"
|
2020-01-15 14:03:42 +00:00
|
|
|
"go.uber.org/zap"
|
2018-03-09 15:55:25 +00:00
|
|
|
)
|
|
|
|
|
2019-09-13 12:43:22 +00:00
|
|
|
type handShakeStage uint8
|
|
|
|
|
|
|
|
const (
|
2019-11-06 08:06:00 +00:00
|
|
|
versionSent handShakeStage = 1 << iota
|
|
|
|
versionReceived
|
|
|
|
verAckSent
|
|
|
|
verAckReceived
|
2020-01-16 18:16:31 +00:00
|
|
|
|
|
|
|
requestQueueSize = 32
|
2020-01-23 16:40:40 +00:00
|
|
|
p2pMsgQueueSize = 16
|
2020-01-16 18:16:31 +00:00
|
|
|
hpRequestQueueSize = 4
|
2021-08-03 18:55:34 +00:00
|
|
|
incomingQueueSize = 1 // Each message can be up to 32MB in size.
|
2019-09-13 12:43:22 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
network: fix networking stalls caused by stale peers
We can leak sending goroutines and stall broadcasts because of already gone
peers that happened to be cached by some s.Peers() user (more than 800 of
these can be seen in nodoka log along with (*Server).run blocking on
CMDGetAddr send):
Feb 10 16:35:15 nodoka neo-go[1563]: goroutine 41 [chan send, 3320 minutes]:
Feb 10 16:35:15 nodoka neo-go[1563]: github.com/CityOfZion/neo-go/pkg/network.(*TCPPeer).putPacketIntoQueue(...)
Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_peer.go:81
Feb 10 16:35:15 nodoka neo-go[1563]: github.com/CityOfZion/neo-go/pkg/network.(*TCPPeer).EnqueueHPPacket(0xc0083d57a0, 0xc017206100, 0x18, 0x40, 0x136a240, 0xc018ef9720)
Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_peer.go:119 +0x98
Feb 10 16:35:15 nodoka neo-go[1563]: github.com/CityOfZion/neo-go/pkg/network.(*Server).iteratePeersWithSendMsg(0xc0000ca000, 0xc0001848a0, 0xcb4550, 0x0)
Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:720 +0x12a
Feb 10 16:35:15 nodoka neo-go[1563]: github.com/CityOfZion/neo-go/pkg/network.(*Server).broadcastHPMessage(...)
Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:731
Feb 10 16:35:15 nodoka neo-go[1563]: github.com/CityOfZion/neo-go/pkg/network.(*Server).run(0xc0000ca000)
Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:203 +0xee4
Feb 10 16:35:15 nodoka neo-go[1563]: github.com/CityOfZion/neo-go/pkg/network.(*Server).Start(0xc0000ca000, 0xc000072c60)
Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:173 +0x2ec
Feb 10 16:35:15 nodoka neo-go[1563]: created by github.com/CityOfZion/neo-go/cli/server.startServer
Feb 10 16:35:15 nodoka neo-go[1563]: /go/src/github.com/CityOfZion/neo-go/cli/server/server.go:331 +0x476
2020-02-10 14:21:56 +00:00
|
|
|
errGone = errors.New("the peer is gone already")
|
2020-01-20 16:02:19 +00:00
|
|
|
errStateMismatch = errors.New("tried to send protocol message before handshake completed")
|
|
|
|
errPingPong = errors.New("ping/pong timeout")
|
|
|
|
errUnexpectedPong = errors.New("pong message wasn't expected")
|
2019-09-13 12:43:22 +00:00
|
|
|
)
|
|
|
|
|
2018-03-09 15:55:25 +00:00
|
|
|
// TCPPeer represents a connected remote node in the
|
|
|
|
// network over TCP.
|
|
|
|
type TCPPeer struct {
|
2018-03-14 09:36:59 +00:00
|
|
|
// underlying TCP connection.
|
2019-09-09 14:54:38 +00:00
|
|
|
conn net.Conn
|
2020-01-15 14:03:42 +00:00
|
|
|
// The server this peer belongs to.
|
|
|
|
server *Server
|
2018-03-14 09:36:59 +00:00
|
|
|
// The version of the peer.
|
2018-03-09 15:55:25 +00:00
|
|
|
version *payload.Version
|
2020-01-17 10:17:19 +00:00
|
|
|
// Index of the last block.
|
|
|
|
lastBlockIndex uint32
|
2018-03-09 15:55:25 +00:00
|
|
|
|
2020-05-22 09:17:17 +00:00
|
|
|
lock sync.RWMutex
|
|
|
|
finale sync.Once
|
|
|
|
handShake handShakeStage
|
|
|
|
isFullNode bool
|
2019-09-13 12:43:22 +00:00
|
|
|
|
2020-01-23 16:40:40 +00:00
|
|
|
done chan struct{}
|
|
|
|
sendQ chan []byte
|
|
|
|
p2pSendQ chan []byte
|
|
|
|
hpSendQ chan []byte
|
2021-08-03 18:55:34 +00:00
|
|
|
incoming chan *Message
|
2018-03-09 15:55:25 +00:00
|
|
|
|
2020-11-25 10:34:38 +00:00
|
|
|
// track outstanding getaddr requests.
|
|
|
|
getAddrSent atomic.Int32
|
|
|
|
|
2020-01-17 10:17:19 +00:00
|
|
|
// number of sent pings.
|
2020-01-20 16:02:19 +00:00
|
|
|
pingSent int
|
|
|
|
pingTimer *time.Timer
|
2018-03-09 15:55:25 +00:00
|
|
|
}
|
|
|
|
|
2019-09-03 14:51:37 +00:00
|
|
|
// NewTCPPeer returns a TCPPeer structure based on the given connection.
|
2020-01-15 14:03:42 +00:00
|
|
|
func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer {
|
2018-03-09 15:55:25 +00:00
|
|
|
return &TCPPeer{
|
2020-01-23 16:40:40 +00:00
|
|
|
conn: conn,
|
|
|
|
server: s,
|
|
|
|
done: make(chan struct{}),
|
|
|
|
sendQ: make(chan []byte, requestQueueSize),
|
|
|
|
p2pSendQ: make(chan []byte, p2pMsgQueueSize),
|
|
|
|
hpSendQ: make(chan []byte, hpRequestQueueSize),
|
2021-08-03 18:55:34 +00:00
|
|
|
incoming: make(chan *Message, incomingQueueSize),
|
2018-03-09 15:55:25 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-10-12 12:43:31 +00:00
|
|
|
// putPacketIntoQueue puts the given message into the given queue if
|
network: rework broadcast logic
We have a number of queues for different purposes:
* regular broadcast queue
* direct p2p queue
* high-priority queue
And two basic egress scenarios:
* direct p2p messages (replies to requests in Server's handle* methods)
* broadcasted messages
Low priority broadcasted messages:
* transaction inventories
* block inventories
* notary inventories
* non-consensus extensibles
High-priority broadcasted messages:
* consensus extensibles
* getdata transaction requests from consensus process
* getaddr requests
P2P messages are a bit more complicated, most of the time they use p2p queue,
but extensible message requests/replies use HP queue.
Server's handle* code is run from Peer's handleIncoming, every peer has this
thread that handles incoming messages. When working with the peer it's
important to reply to requests and blocking this thread until we send (queue)
a reply is fine, if the peer is slow we just won't get anything new from
it. The queue used is irrelevant wrt this issue.
Broadcasted messages are radically different, we want them to be delivered to
many peers, but we don't care about specific ones. If it's delivered to 2/3 of
the peers we're fine, if it's delivered to more of them --- it's not an
issue. But doing this fairly is not an easy thing, current code tries performing
unblocked sends and if this doesn't yield enough results it then blocks (but
has a timeout, we can't wait indefinitely). But it does so in sequential
manner, once the peer is chosen the code will wait for it (and only it) until
timeout happens.
What can be done instead is an attempt to push the message to all of the peers
simultaneously (or close to that). If they all deliver --- OK, if some block
and wait then we can wait until _any_ of them pushes the message through (or
global timeout happens, we still can't wait forever). If we have enough
deliveries then we can cancel pending ones and it's again not an error if
these canceled threads still do their job.
This makes the system more dynamic and adds some substantial processing
overhead, but it's a networking code, any of this overhead is much lower than
the actual packet delivery time. It also allows to spread the load more
fairly, if there is any spare queue it'll get the packet and release the
broadcaster. On the next broadcast iteration another peer is more likely to be
chosen just because it didn't get a message previously (and had some time to
deliver already queued messages).
It works perfectly in tests, with optimal networking conditions we have much
better block times and TPS increases by 5-25%% depending on the scenario.
I'd go as far as to say that it fixes the original problem of #2678, because
in this particular scenario we have empty queues in ~100% of the cases and
this new logic will likely lead to 100% fan out in this case (cancelation just
won't happen fast enough). But when the load grows and there is some waiting
in the queue it will optimize out the slowest links.
2022-10-10 19:48:06 +00:00
|
|
|
// the peer has done handshaking using the given context.
|
2022-10-12 12:43:31 +00:00
|
|
|
func (p *TCPPeer) putPacketIntoQueue(ctx context.Context, queue chan<- []byte, msg []byte) error {
|
network: rework broadcast logic
We have a number of queues for different purposes:
* regular broadcast queue
* direct p2p queue
* high-priority queue
And two basic egress scenarios:
* direct p2p messages (replies to requests in Server's handle* methods)
* broadcasted messages
Low priority broadcasted messages:
* transaction inventories
* block inventories
* notary inventories
* non-consensus extensibles
High-priority broadcasted messages:
* consensus extensibles
* getdata transaction requests from consensus process
* getaddr requests
P2P messages are a bit more complicated, most of the time they use p2p queue,
but extensible message requests/replies use HP queue.
Server's handle* code is run from Peer's handleIncoming, every peer has this
thread that handles incoming messages. When working with the peer it's
important to reply to requests and blocking this thread until we send (queue)
a reply is fine, if the peer is slow we just won't get anything new from
it. The queue used is irrelevant wrt this issue.
Broadcasted messages are radically different, we want them to be delivered to
many peers, but we don't care about specific ones. If it's delivered to 2/3 of
the peers we're fine, if it's delivered to more of them --- it's not an
issue. But doing this fairly is not an easy thing, current code tries performing
unblocked sends and if this doesn't yield enough results it then blocks (but
has a timeout, we can't wait indefinitely). But it does so in sequential
manner, once the peer is chosen the code will wait for it (and only it) until
timeout happens.
What can be done instead is an attempt to push the message to all of the peers
simultaneously (or close to that). If they all deliver --- OK, if some block
and wait then we can wait until _any_ of them pushes the message through (or
global timeout happens, we still can't wait forever). If we have enough
deliveries then we can cancel pending ones and it's again not an error if
these canceled threads still do their job.
This makes the system more dynamic and adds some substantial processing
overhead, but it's a networking code, any of this overhead is much lower than
the actual packet delivery time. It also allows to spread the load more
fairly, if there is any spare queue it'll get the packet and release the
broadcaster. On the next broadcast iteration another peer is more likely to be
chosen just because it didn't get a message previously (and had some time to
deliver already queued messages).
It works perfectly in tests, with optimal networking conditions we have much
better block times and TPS increases by 5-25%% depending on the scenario.
I'd go as far as to say that it fixes the original problem of #2678, because
in this particular scenario we have empty queues in ~100% of the cases and
this new logic will likely lead to 100% fan out in this case (cancelation just
won't happen fast enough). But when the load grows and there is some waiting
in the queue it will optimize out the slowest links.
2022-10-10 19:48:06 +00:00
|
|
|
if !p.Handshaked() {
|
|
|
|
return errStateMismatch
|
|
|
|
}
|
|
|
|
select {
|
|
|
|
case queue <- msg:
|
|
|
|
case <-p.done:
|
|
|
|
return errGone
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// BroadcastPacket implements the Peer interface.
|
|
|
|
func (p *TCPPeer) BroadcastPacket(ctx context.Context, msg []byte) error {
|
2022-10-12 12:43:31 +00:00
|
|
|
return p.putPacketIntoQueue(ctx, p.sendQ, msg)
|
network: rework broadcast logic
We have a number of queues for different purposes:
* regular broadcast queue
* direct p2p queue
* high-priority queue
And two basic egress scenarios:
* direct p2p messages (replies to requests in Server's handle* methods)
* broadcasted messages
Low priority broadcasted messages:
* transaction inventories
* block inventories
* notary inventories
* non-consensus extensibles
High-priority broadcasted messages:
* consensus extensibles
* getdata transaction requests from consensus process
* getaddr requests
P2P messages are a bit more complicated, most of the time they use p2p queue,
but extensible message requests/replies use HP queue.
Server's handle* code is run from Peer's handleIncoming, every peer has this
thread that handles incoming messages. When working with the peer it's
important to reply to requests and blocking this thread until we send (queue)
a reply is fine, if the peer is slow we just won't get anything new from
it. The queue used is irrelevant wrt this issue.
Broadcasted messages are radically different, we want them to be delivered to
many peers, but we don't care about specific ones. If it's delivered to 2/3 of
the peers we're fine, if it's delivered to more of them --- it's not an
issue. But doing this fairly is not an easy thing, current code tries performing
unblocked sends and if this doesn't yield enough results it then blocks (but
has a timeout, we can't wait indefinitely). But it does so in sequential
manner, once the peer is chosen the code will wait for it (and only it) until
timeout happens.
What can be done instead is an attempt to push the message to all of the peers
simultaneously (or close to that). If they all deliver --- OK, if some block
and wait then we can wait until _any_ of them pushes the message through (or
global timeout happens, we still can't wait forever). If we have enough
deliveries then we can cancel pending ones and it's again not an error if
these canceled threads still do their job.
This makes the system more dynamic and adds some substantial processing
overhead, but it's a networking code, any of this overhead is much lower than
the actual packet delivery time. It also allows to spread the load more
fairly, if there is any spare queue it'll get the packet and release the
broadcaster. On the next broadcast iteration another peer is more likely to be
chosen just because it didn't get a message previously (and had some time to
deliver already queued messages).
It works perfectly in tests, with optimal networking conditions we have much
better block times and TPS increases by 5-25%% depending on the scenario.
I'd go as far as to say that it fixes the original problem of #2678, because
in this particular scenario we have empty queues in ~100% of the cases and
this new logic will likely lead to 100% fan out in this case (cancelation just
won't happen fast enough). But when the load grows and there is some waiting
in the queue it will optimize out the slowest links.
2022-10-10 19:48:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// BroadcastHPPacket implements the Peer interface. It the peer is not yet
|
|
|
|
// handshaked it's a noop.
|
|
|
|
func (p *TCPPeer) BroadcastHPPacket(ctx context.Context, msg []byte) error {
|
2022-10-12 12:43:31 +00:00
|
|
|
return p.putPacketIntoQueue(ctx, p.hpSendQ, msg)
|
2020-01-23 16:40:40 +00:00
|
|
|
}
|
|
|
|
|
2022-04-20 18:30:09 +00:00
|
|
|
// putMessageIntoQueue serializes the given Message and puts it into given queue if
|
2020-01-23 16:40:40 +00:00
|
|
|
// the peer has done handshaking.
|
|
|
|
func (p *TCPPeer) putMsgIntoQueue(queue chan<- []byte, msg *Message) error {
|
2020-01-16 18:16:31 +00:00
|
|
|
b, err := msg.Bytes()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2022-10-12 12:43:31 +00:00
|
|
|
return p.putPacketIntoQueue(context.Background(), queue, b)
|
2020-01-23 16:40:40 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// EnqueueP2PMessage implements the Peer interface.
|
|
|
|
func (p *TCPPeer) EnqueueP2PMessage(msg *Message) error {
|
|
|
|
return p.putMsgIntoQueue(p.p2pSendQ, msg)
|
2020-01-16 18:16:31 +00:00
|
|
|
}
|
|
|
|
|
2022-10-12 12:39:20 +00:00
|
|
|
// EnqueueHPMessage implements the Peer interface.
|
|
|
|
func (p *TCPPeer) EnqueueHPMessage(msg *Message) error {
|
|
|
|
return p.putMsgIntoQueue(p.hpSendQ, msg)
|
2019-09-13 12:43:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (p *TCPPeer) writeMsg(msg *Message) error {
|
2020-01-16 18:16:31 +00:00
|
|
|
b, err := msg.Bytes()
|
|
|
|
if err != nil {
|
2018-04-13 10:14:08 +00:00
|
|
|
return err
|
2020-01-15 15:25:58 +00:00
|
|
|
}
|
2019-11-16 09:42:03 +00:00
|
|
|
|
2020-01-16 18:16:31 +00:00
|
|
|
_, err = p.conn.Write(b)
|
2019-11-16 09:42:03 +00:00
|
|
|
|
2020-01-15 15:25:58 +00:00
|
|
|
return err
|
2018-03-09 15:55:25 +00:00
|
|
|
}
|
|
|
|
|
2020-01-15 14:03:42 +00:00
|
|
|
// handleConn handles the read side of the connection, it should be started as
|
2022-04-20 18:30:09 +00:00
|
|
|
// a goroutine right after a new peer setup.
|
2020-01-15 14:03:42 +00:00
|
|
|
func (p *TCPPeer) handleConn() {
|
|
|
|
var err error
|
|
|
|
|
|
|
|
p.server.register <- p
|
|
|
|
|
2020-01-16 18:16:31 +00:00
|
|
|
go p.handleQueues()
|
2021-08-03 18:55:34 +00:00
|
|
|
go p.handleIncoming()
|
2022-04-20 18:30:09 +00:00
|
|
|
// When a new peer is connected, we send out our version immediately.
|
2020-01-21 14:26:08 +00:00
|
|
|
err = p.SendVersion()
|
2020-01-15 14:03:42 +00:00
|
|
|
if err == nil {
|
|
|
|
r := io.NewBinReaderFromIO(p.conn)
|
|
|
|
for {
|
2022-01-20 18:14:42 +00:00
|
|
|
msg := &Message{StateRootInHeader: p.server.config.StateRootInHeader}
|
2020-01-15 14:03:42 +00:00
|
|
|
err = msg.Decode(r)
|
|
|
|
|
2022-09-02 11:29:47 +00:00
|
|
|
if errors.Is(err, payload.ErrTooManyHeaders) {
|
2020-01-15 14:03:42 +00:00
|
|
|
p.server.log.Warn("not all headers were processed")
|
|
|
|
r.Err = nil
|
|
|
|
} else if err != nil {
|
|
|
|
break
|
|
|
|
}
|
2021-08-03 18:55:34 +00:00
|
|
|
p.incoming <- msg
|
|
|
|
}
|
|
|
|
}
|
|
|
|
p.Disconnect(err)
|
2021-11-01 09:20:55 +00:00
|
|
|
close(p.incoming)
|
2021-08-03 18:55:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (p *TCPPeer) handleIncoming() {
|
|
|
|
var err error
|
|
|
|
for msg := range p.incoming {
|
|
|
|
err = p.server.handleMessage(p, msg)
|
|
|
|
if err != nil {
|
|
|
|
if p.Handshaked() {
|
|
|
|
err = fmt.Errorf("handling %s message: %w", msg.Command.String(), err)
|
2020-01-15 14:03:42 +00:00
|
|
|
}
|
2021-08-03 18:55:34 +00:00
|
|
|
break
|
2020-01-15 14:03:42 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
p.Disconnect(err)
|
|
|
|
}
|
|
|
|
|
2020-01-16 18:16:31 +00:00
|
|
|
// handleQueues is a goroutine that is started automatically to handle
|
|
|
|
// send queues.
|
|
|
|
func (p *TCPPeer) handleQueues() {
|
|
|
|
var err error
|
2020-01-29 08:36:26 +00:00
|
|
|
// p2psend queue shares its time with send queue in around
|
|
|
|
// ((p2pSkipDivisor - 1) * 2 + 1)/1 ratio, ratio because the third
|
|
|
|
// select can still choose p2psend over send.
|
|
|
|
var p2pSkipCounter uint32
|
|
|
|
const p2pSkipDivisor = 4
|
2020-01-16 18:16:31 +00:00
|
|
|
|
2022-01-20 18:14:42 +00:00
|
|
|
var writeTimeout = time.Duration(p.server.config.SecondsPerBlock) * time.Second
|
2020-01-16 18:16:31 +00:00
|
|
|
for {
|
|
|
|
var msg []byte
|
|
|
|
|
|
|
|
// This one is to give priority to the hp queue
|
|
|
|
select {
|
|
|
|
case <-p.done:
|
|
|
|
return
|
|
|
|
case msg = <-p.hpSendQ:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
2020-01-29 08:36:26 +00:00
|
|
|
// Skip this select every p2pSkipDivisor iteration.
|
|
|
|
if msg == nil && p2pSkipCounter%p2pSkipDivisor != 0 {
|
2020-01-23 16:40:40 +00:00
|
|
|
// Then look at the p2p queue.
|
|
|
|
select {
|
|
|
|
case <-p.done:
|
|
|
|
return
|
|
|
|
case msg = <-p.hpSendQ:
|
|
|
|
case msg = <-p.p2pSendQ:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// If there is no message in HP or P2P queues, block until one
|
2020-01-16 18:16:31 +00:00
|
|
|
// appears in any of the queues.
|
|
|
|
if msg == nil {
|
|
|
|
select {
|
|
|
|
case <-p.done:
|
|
|
|
return
|
|
|
|
case msg = <-p.hpSendQ:
|
2020-01-23 16:40:40 +00:00
|
|
|
case msg = <-p.p2pSendQ:
|
2020-01-16 18:16:31 +00:00
|
|
|
case msg = <-p.sendQ:
|
|
|
|
}
|
|
|
|
}
|
2020-12-24 11:38:58 +00:00
|
|
|
err = p.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
|
|
|
|
if err != nil {
|
|
|
|
break
|
|
|
|
}
|
2020-01-16 18:16:31 +00:00
|
|
|
_, err = p.conn.Write(msg)
|
|
|
|
if err != nil {
|
|
|
|
break
|
|
|
|
}
|
2020-01-29 08:36:26 +00:00
|
|
|
p2pSkipCounter++
|
2020-01-16 18:16:31 +00:00
|
|
|
}
|
|
|
|
p.Disconnect(err)
|
|
|
|
}
|
|
|
|
|
2020-01-15 14:03:42 +00:00
|
|
|
// StartProtocol starts a long running background loop that interacts
|
|
|
|
// every ProtoTickInterval with the peer. It's only good to run after the
|
|
|
|
// handshake.
|
|
|
|
func (p *TCPPeer) StartProtocol() {
|
|
|
|
var err error
|
|
|
|
|
|
|
|
p.server.log.Info("started protocol",
|
|
|
|
zap.Stringer("addr", p.RemoteAddr()),
|
|
|
|
zap.ByteString("userAgent", p.Version().UserAgent),
|
2020-05-22 09:17:17 +00:00
|
|
|
zap.Uint32("startHeight", p.lastBlockIndex),
|
2020-01-15 14:03:42 +00:00
|
|
|
zap.Uint32("id", p.Version().Nonce))
|
|
|
|
|
2020-05-22 09:59:18 +00:00
|
|
|
p.server.discovery.RegisterGoodAddr(p.PeerAddr().String(), p.version.Capabilities)
|
2021-07-30 13:57:42 +00:00
|
|
|
err = p.server.requestBlocksOrHeaders(p)
|
|
|
|
if err != nil {
|
|
|
|
p.Disconnect(err)
|
|
|
|
return
|
2020-01-15 14:03:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
timer := time.NewTimer(p.server.ProtoTickInterval)
|
|
|
|
for {
|
|
|
|
select {
|
2020-01-15 15:25:58 +00:00
|
|
|
case <-p.done:
|
|
|
|
return
|
2020-01-15 14:03:42 +00:00
|
|
|
case <-timer.C:
|
2021-07-30 13:57:42 +00:00
|
|
|
// Try to sync in headers and block with the peer if his block height is higher than ours.
|
|
|
|
err = p.server.requestBlocksOrHeaders(p)
|
2020-01-20 13:58:28 +00:00
|
|
|
if err == nil {
|
|
|
|
timer.Reset(p.server.ProtoTickInterval)
|
|
|
|
}
|
2020-01-15 14:03:42 +00:00
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
timer.Stop()
|
|
|
|
p.Disconnect(err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-09-13 12:43:22 +00:00
|
|
|
// Handshaked returns status of the handshake, whether it's completed or not.
|
|
|
|
func (p *TCPPeer) Handshaked() bool {
|
2019-11-06 08:06:00 +00:00
|
|
|
p.lock.RLock()
|
|
|
|
defer p.lock.RUnlock()
|
2020-05-22 09:17:17 +00:00
|
|
|
return p.handshaked()
|
|
|
|
}
|
|
|
|
|
|
|
|
// handshaked is internal unlocked version of Handshaked().
|
|
|
|
func (p *TCPPeer) handshaked() bool {
|
2019-11-06 08:06:00 +00:00
|
|
|
return p.handShake == (verAckReceived | verAckSent | versionReceived | versionSent)
|
2019-09-13 12:43:22 +00:00
|
|
|
}
|
|
|
|
|
2020-05-22 09:17:17 +00:00
|
|
|
// IsFullNode returns whether the node has full capability or TCP/WS only.
|
|
|
|
func (p *TCPPeer) IsFullNode() bool {
|
|
|
|
p.lock.RLock()
|
|
|
|
defer p.lock.RUnlock()
|
|
|
|
return p.handshaked() && p.isFullNode
|
|
|
|
}
|
|
|
|
|
2019-09-13 12:43:22 +00:00
|
|
|
// SendVersion checks for the handshake state and sends a message to the peer.
|
2020-01-21 14:26:08 +00:00
|
|
|
func (p *TCPPeer) SendVersion() error {
|
2020-05-22 09:17:17 +00:00
|
|
|
msg, err := p.server.getVersionMsg()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-11-06 08:06:00 +00:00
|
|
|
p.lock.Lock()
|
|
|
|
defer p.lock.Unlock()
|
|
|
|
if p.handShake&versionSent != 0 {
|
|
|
|
return errors.New("invalid handshake: already sent Version")
|
2019-09-13 12:43:22 +00:00
|
|
|
}
|
2020-05-22 09:17:17 +00:00
|
|
|
err = p.writeMsg(msg)
|
2019-09-13 12:43:22 +00:00
|
|
|
if err == nil {
|
2019-11-06 08:06:00 +00:00
|
|
|
p.handShake |= versionSent
|
2019-09-13 12:43:22 +00:00
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// HandleVersion checks for the handshake state and version message contents.
|
|
|
|
func (p *TCPPeer) HandleVersion(version *payload.Version) error {
|
2019-11-06 08:06:00 +00:00
|
|
|
p.lock.Lock()
|
|
|
|
defer p.lock.Unlock()
|
|
|
|
if p.handShake&versionReceived != 0 {
|
|
|
|
return errors.New("invalid handshake: already received Version")
|
2019-09-13 12:43:22 +00:00
|
|
|
}
|
|
|
|
p.version = version
|
2020-05-22 09:17:17 +00:00
|
|
|
for _, cap := range version.Capabilities {
|
|
|
|
if cap.Type == capability.FullNode {
|
|
|
|
p.isFullNode = true
|
|
|
|
p.lastBlockIndex = cap.Data.(*capability.Node).StartHeight
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-06 08:06:00 +00:00
|
|
|
p.handShake |= versionReceived
|
2019-09-13 12:43:22 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// SendVersionAck checks for the handshake state and sends a message to the peer.
|
|
|
|
func (p *TCPPeer) SendVersionAck(msg *Message) error {
|
2019-11-06 08:06:00 +00:00
|
|
|
p.lock.Lock()
|
|
|
|
defer p.lock.Unlock()
|
|
|
|
if p.handShake&versionReceived == 0 {
|
|
|
|
return errors.New("invalid handshake: tried to send VersionAck, but no version received yet")
|
|
|
|
}
|
2019-11-06 15:05:50 +00:00
|
|
|
if p.handShake&versionSent == 0 {
|
|
|
|
return errors.New("invalid handshake: tried to send VersionAck, but didn't send Version yet")
|
|
|
|
}
|
2019-11-06 08:06:00 +00:00
|
|
|
if p.handShake&verAckSent != 0 {
|
|
|
|
return errors.New("invalid handshake: already sent VersionAck")
|
2019-09-13 12:43:22 +00:00
|
|
|
}
|
|
|
|
err := p.writeMsg(msg)
|
|
|
|
if err == nil {
|
2019-11-06 08:06:00 +00:00
|
|
|
p.handShake |= verAckSent
|
2019-09-13 12:43:22 +00:00
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// HandleVersionAck checks handshake sequence correctness when VerAck message
|
|
|
|
// is received.
|
|
|
|
func (p *TCPPeer) HandleVersionAck() error {
|
2019-11-06 08:06:00 +00:00
|
|
|
p.lock.Lock()
|
|
|
|
defer p.lock.Unlock()
|
|
|
|
if p.handShake&versionSent == 0 {
|
|
|
|
return errors.New("invalid handshake: received VersionAck, but no version sent yet")
|
|
|
|
}
|
2019-11-06 15:05:50 +00:00
|
|
|
if p.handShake&versionReceived == 0 {
|
|
|
|
return errors.New("invalid handshake: received VersionAck, but no version received yet")
|
|
|
|
}
|
2019-11-06 08:06:00 +00:00
|
|
|
if p.handShake&verAckReceived != 0 {
|
|
|
|
return errors.New("invalid handshake: already received VersionAck")
|
2019-09-13 12:43:22 +00:00
|
|
|
}
|
2019-11-06 08:06:00 +00:00
|
|
|
p.handShake |= verAckReceived
|
2019-09-13 12:43:22 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-11-06 07:55:21 +00:00
|
|
|
// RemoteAddr implements the Peer interface.
|
|
|
|
func (p *TCPPeer) RemoteAddr() net.Addr {
|
|
|
|
return p.conn.RemoteAddr()
|
|
|
|
}
|
|
|
|
|
|
|
|
// PeerAddr implements the Peer interface.
|
|
|
|
func (p *TCPPeer) PeerAddr() net.Addr {
|
|
|
|
remote := p.conn.RemoteAddr()
|
|
|
|
// The network can be non-tcp in unit tests.
|
2020-01-28 17:52:24 +00:00
|
|
|
if p.version == nil || remote.Network() != "tcp" {
|
2019-11-06 07:55:21 +00:00
|
|
|
return p.RemoteAddr()
|
|
|
|
}
|
|
|
|
host, _, err := net.SplitHostPort(remote.String())
|
|
|
|
if err != nil {
|
|
|
|
return p.RemoteAddr()
|
|
|
|
}
|
2020-05-22 09:17:17 +00:00
|
|
|
var port uint16
|
|
|
|
for _, cap := range p.version.Capabilities {
|
|
|
|
if cap.Type == capability.TCPServer {
|
|
|
|
port = cap.Data.(*capability.Server).Port
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if port == 0 {
|
|
|
|
return p.RemoteAddr()
|
|
|
|
}
|
|
|
|
addrString := net.JoinHostPort(host, strconv.Itoa(int(port)))
|
2019-11-06 07:55:21 +00:00
|
|
|
tcpAddr, err := net.ResolveTCPAddr("tcp", addrString)
|
|
|
|
if err != nil {
|
|
|
|
return p.RemoteAddr()
|
|
|
|
}
|
|
|
|
return tcpAddr
|
2018-03-09 15:55:25 +00:00
|
|
|
}
|
|
|
|
|
2018-04-13 10:14:08 +00:00
|
|
|
// Disconnect will fill the peer's done channel with the given error.
|
|
|
|
func (p *TCPPeer) Disconnect(err error) {
|
2020-01-15 15:25:58 +00:00
|
|
|
p.finale.Do(func() {
|
|
|
|
close(p.done)
|
network: change the disconnect procedure
We can still lock the (*Server).run with dead peers:
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: goroutine 40 [select, 871 minutes]:
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*TCPPeer).putPacketIntoQueue(0xc030ab5320, 0xc02f251f20, 0xc00af0dcc0, 0x18, 0x40, 0x100000000000000, 0xffffffffffffffff)
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_peer.go:82 +0xf4
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*TCPPeer).EnqueueHPPacket(0xc030ab5320, 0xc00af0dcc0, 0x18, 0x40, 0x1367240, 0xc03090ef98)
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_peer.go:124 +0x52
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*Server).iteratePeersWithSendMsg(0xc0000ca000, 0xc00af35800, 0xcb2a58, 0x0)
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:720 +0x12a
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*Server).broadcastHPMessage(...)
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:731
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*Server).run(0xc0000ca000)
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:203 +0xee4
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*Server).Start(0xc0000ca000, 0xc000072ba0)
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/server.go:173 +0x2ec
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: created by github.com/CityOfZion/neo-go/cli/server.startServer
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/cli/server/server.go:331 +0x476
...
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: goroutine 2199 [chan send, 870 minutes]:
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*TCPPeer).Disconnect.func1()
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_peer.go:366 +0x85
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: sync.(*Once).Do(0xc030ab403c, 0xc02f262788)
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/usr/local/go/src/sync/once.go:44 +0xb3
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*TCPPeer).Disconnect(0xc030ab4000, 0xd92440, 0xc000065a00)
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_peer.go:365 +0x6d
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*TCPPeer).SendPing.func1()
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_peer.go:394 +0x42
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: created by time.goFunc
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/usr/local/go/src/time/sleep.go:169 +0x44
...
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: goroutine 3448 [chan send, 854 minutes]:
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: github.com/CityOfZion/neo-go/pkg/network.(*TCPPeer).handleConn(0xc01ed203f0)
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_peer.go:143 +0x6c
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: created by github.com/CityOfZion/neo-go/pkg/network.(*TCPTransport).Accept
Feb 13 16:14:50 neo-go-node-2 neo-go[9448]: #011/go/src/github.com/CityOfZion/neo-go/pkg/network/tcp_transport.go:62 +0x44c
...
The problem is that the select in putPacketIntoQueue() only works the way it
was intended to after the `close(p.done)`, but that happens only after
successful unregistration request send. Thus, do disconnects the other way
around, first unblock queueing and exit goroutines, then destroy the
connection (if it wasn't previously destroyed) and only after that signal to
the Server.
2020-02-13 13:24:46 +00:00
|
|
|
p.conn.Close()
|
|
|
|
p.server.unregister <- peerDrop{p, err}
|
2020-01-15 15:25:58 +00:00
|
|
|
})
|
2018-04-13 10:14:08 +00:00
|
|
|
}
|
|
|
|
|
2018-03-14 09:36:59 +00:00
|
|
|
// Version implements the Peer interface.
|
|
|
|
func (p *TCPPeer) Version() *payload.Version {
|
|
|
|
return p.version
|
2018-03-10 12:04:06 +00:00
|
|
|
}
|
2020-01-17 10:17:19 +00:00
|
|
|
|
2022-04-20 18:30:09 +00:00
|
|
|
// LastBlockIndex returns the last block index.
|
2020-01-17 10:17:19 +00:00
|
|
|
func (p *TCPPeer) LastBlockIndex() uint32 {
|
|
|
|
p.lock.RLock()
|
|
|
|
defer p.lock.RUnlock()
|
|
|
|
return p.lastBlockIndex
|
|
|
|
}
|
|
|
|
|
2022-10-12 12:25:03 +00:00
|
|
|
// SetPingTimer adds an outgoing ping to the counter and sets a PingTimeout timer
|
|
|
|
// that will shut the connection down in case of no response.
|
|
|
|
func (p *TCPPeer) SetPingTimer() {
|
2020-01-17 10:17:19 +00:00
|
|
|
p.lock.Lock()
|
2020-01-20 16:02:19 +00:00
|
|
|
p.pingSent++
|
|
|
|
if p.pingTimer == nil {
|
|
|
|
p.pingTimer = time.AfterFunc(p.server.PingTimeout, func() {
|
|
|
|
p.Disconnect(errPingPong)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
p.lock.Unlock()
|
2020-01-17 10:17:19 +00:00
|
|
|
}
|
|
|
|
|
2020-08-14 13:22:15 +00:00
|
|
|
// HandlePing handles a ping message received from the peer.
|
|
|
|
func (p *TCPPeer) HandlePing(ping *payload.Ping) error {
|
|
|
|
p.lock.Lock()
|
|
|
|
defer p.lock.Unlock()
|
|
|
|
p.lastBlockIndex = ping.LastBlockIndex
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-04-20 18:30:09 +00:00
|
|
|
// HandlePong handles a pong message received from the peer and does an appropriate
|
2020-01-20 16:02:19 +00:00
|
|
|
// accounting of outstanding pings and timeouts.
|
|
|
|
func (p *TCPPeer) HandlePong(pong *payload.Ping) error {
|
2020-01-17 10:17:19 +00:00
|
|
|
p.lock.Lock()
|
|
|
|
defer p.lock.Unlock()
|
2020-01-20 16:02:19 +00:00
|
|
|
if p.pingTimer != nil && !p.pingTimer.Stop() {
|
|
|
|
return errPingPong
|
|
|
|
}
|
|
|
|
p.pingTimer = nil
|
|
|
|
p.pingSent--
|
|
|
|
if p.pingSent < 0 {
|
|
|
|
return errUnexpectedPong
|
|
|
|
}
|
|
|
|
p.lastBlockIndex = pong.LastBlockIndex
|
|
|
|
return nil
|
2020-01-17 10:17:19 +00:00
|
|
|
}
|
2020-11-25 10:34:38 +00:00
|
|
|
|
2022-04-20 18:30:09 +00:00
|
|
|
// AddGetAddrSent increments internal outstanding getaddr requests counter. Then,
|
|
|
|
// the peer can only send one addr reply per getaddr request.
|
2020-11-25 10:34:38 +00:00
|
|
|
func (p *TCPPeer) AddGetAddrSent() {
|
|
|
|
p.getAddrSent.Inc()
|
|
|
|
}
|
|
|
|
|
|
|
|
// CanProcessAddr decrements internal outstanding getaddr requests counter and
|
|
|
|
// answers whether the addr command from the peer can be safely processed.
|
|
|
|
func (p *TCPPeer) CanProcessAddr() bool {
|
|
|
|
v := p.getAddrSent.Dec()
|
|
|
|
return v >= 0
|
|
|
|
}
|