network: add ping pong processing

add pingInterval same as used in ref C# implementation with the same logic
add pingTimeout which is used to check whether pong received. If not -- drop the peer.
add pingLimit which is hardcoded to 4 in TCPPeer. It's limit for unsuccessful ping/pong calls (where pong wasn't received in pingTimeout interval)
This commit is contained in:
Vsevolod Brekelov 2020-01-17 13:17:19 +03:00
parent 7ba5267494
commit 4e6ed9021c
18 changed files with 195 additions and 3 deletions

View file

@ -73,6 +73,8 @@ type (
Relay bool `yaml:"Relay"`
DialTimeout time.Duration `yaml:"DialTimeout"`
ProtoTickInterval time.Duration `yaml:"ProtoTickInterval"`
PingInterval time.Duration `yaml:"PingInterval"`
PingTimeout time.Duration `yaml:"PingTimeout"`
MaxPeers int `yaml:"MaxPeers"`
AttemptConnPeers int `yaml:"AttemptConnPeers"`
MinPeers int `yaml:"MinPeers"`

View file

@ -50,6 +50,8 @@ ApplicationConfiguration:
Relay: true
DialTimeout: 3
ProtoTickInterval: 2
PingInterval: 60
PingTimeout: 60
MaxPeers: 100
AttemptConnPeers: 20
MinPeers: 5

View file

@ -41,6 +41,8 @@ ApplicationConfiguration:
Relay: true
DialTimeout: 3
ProtoTickInterval: 2
PingInterval: 60
PingTimeout: 60
MaxPeers: 10
AttemptConnPeers: 5
MinPeers: 3

View file

@ -41,6 +41,8 @@ ApplicationConfiguration:
Relay: true
DialTimeout: 3
ProtoTickInterval: 2
PingInterval: 60
PingTimeout: 60
MaxPeers: 10
AttemptConnPeers: 5
MinPeers: 3

View file

@ -35,6 +35,8 @@ ApplicationConfiguration:
Relay: true
DialTimeout: 3
ProtoTickInterval: 2
PingInterval: 60
PingTimeout: 60
MaxPeers: 10
AttemptConnPeers: 5
MinPeers: 0

View file

@ -41,6 +41,8 @@ ApplicationConfiguration:
Relay: true
DialTimeout: 3
ProtoTickInterval: 2
PingInterval: 60
PingTimeout: 60
MaxPeers: 10
AttemptConnPeers: 5
MinPeers: 3

View file

@ -41,6 +41,8 @@ ApplicationConfiguration:
Relay: true
DialTimeout: 3
ProtoTickInterval: 2
PingInterval: 60
PingTimeout: 60
MaxPeers: 10
AttemptConnPeers: 5
MinPeers: 3

View file

@ -41,6 +41,8 @@ ApplicationConfiguration:
Relay: true
DialTimeout: 3
ProtoTickInterval: 2
PingInterval: 60
PingTimeout: 60
MaxPeers: 10
AttemptConnPeers: 5
MinPeers: 3

View file

@ -50,6 +50,8 @@ ApplicationConfiguration:
Relay: true
DialTimeout: 3
ProtoTickInterval: 2
PingInterval: 60
PingTimeout: 60
MaxPeers: 100
AttemptConnPeers: 20
MinPeers: 5

View file

@ -40,6 +40,8 @@ ApplicationConfiguration:
Relay: true
DialTimeout: 3
ProtoTickInterval: 2
PingInterval: 60
PingTimeout: 60
MaxPeers: 50
AttemptConnPeers: 5
MinPeers: 1

View file

@ -153,9 +153,11 @@ var defaultMessageHandler = func(t *testing.T, msg *Message) {}
type localPeer struct {
netaddr net.TCPAddr
version *payload.Version
lastBlockIndex uint32
handshaked bool
t *testing.T
messageHandler func(t *testing.T, msg *Message)
pingSent int
}
func newLocalPeer(t *testing.T) *localPeer {
@ -185,6 +187,12 @@ func (p *localPeer) Done() chan error {
func (p *localPeer) Version() *payload.Version {
return p.version
}
func (p *localPeer) LastBlockIndex() uint32 {
return p.lastBlockIndex
}
func (p *localPeer) UpdateLastBlockIndex(newIndex uint32) {
p.lastBlockIndex = newIndex
}
func (p *localPeer) HandleVersion(v *payload.Version) error {
p.version = v
return nil
@ -199,6 +207,12 @@ func (p *localPeer) HandleVersionAck() error {
p.handshaked = true
return nil
}
func (p *localPeer) GetPingSent() int {
return p.pingSent
}
func (p *localPeer) UpdatePingSent(newValue int) {
p.pingSent = newValue
}
func (p *localPeer) Handshaked() bool {
return p.handshaked

View file

@ -197,6 +197,8 @@ func (m *Message) decodePayload(br *io.BinReader) error {
p = &transaction.Transaction{}
case CMDMerkleBlock:
p = &payload.MerkleBlock{}
case CMDPing, CMDPong:
p = &payload.Ping{}
default:
return fmt.Errorf("can't decode command %s", cmdByteArrayToString(m.Command))
}

View file

@ -0,0 +1,40 @@
package payload
import (
"time"
"github.com/CityOfZion/neo-go/pkg/io"
)
// Ping payload for ping/pong payloads.
type Ping struct {
// Index of the last block.
LastBlockIndex uint32
// Timestamp.
Timestamp uint32
// Nonce of the server.
Nonce uint32
}
// NewPing creates new Ping payload.
func NewPing(blockIndex uint32, nonce uint32) *Ping {
return &Ping{
LastBlockIndex: blockIndex,
Timestamp: uint32(time.Now().UTC().Unix()),
Nonce: nonce,
}
}
// DecodeBinary implements Serializable interface.
func (p *Ping) DecodeBinary(br *io.BinReader) {
p.LastBlockIndex = br.ReadU32LE()
p.Timestamp = br.ReadU32LE()
p.Nonce = br.ReadU32LE()
}
// EncodeBinary implements Serializable interface.
func (p *Ping) EncodeBinary(bw *io.BinWriter) {
bw.WriteU32LE(p.LastBlockIndex)
bw.WriteU32LE(p.Timestamp)
bw.WriteU32LE(p.Nonce)
}

View file

@ -0,0 +1,25 @@
package payload
import (
"testing"
"github.com/CityOfZion/neo-go/pkg/io"
"github.com/stretchr/testify/assert"
)
func TestEncodeDecodeBinary(t *testing.T) {
payload := NewPing(uint32(1), uint32(2))
assert.NotEqual(t, 0, payload.Timestamp)
bufBinWriter := io.NewBufBinWriter()
payload.EncodeBinary(bufBinWriter.BinWriter)
assert.Nil(t, bufBinWriter.Err)
binReader := io.NewBinReaderFromBuf(bufBinWriter.Bytes())
decodedPing := &Ping{}
decodedPing.DecodeBinary(binReader)
assert.Nil(t, binReader.Err)
assert.Equal(t, uint32(1), decodedPing.LastBlockIndex)
assert.Equal(t, uint32(2), decodedPing.Nonce)
}

View file

@ -21,9 +21,13 @@ type Peer interface {
WriteMsg(msg *Message) error
Done() chan error
Version() *payload.Version
LastBlockIndex() uint32
UpdateLastBlockIndex(lbIndex uint32)
Handshaked() bool
SendVersion(*Message) error
SendVersionAck(*Message) error
HandleVersion(*payload.Version) error
HandleVersionAck() error
GetPingSent() int
UpdatePingSent(int)
}

View file

@ -27,6 +27,7 @@ const (
maxBlockBatch = 200
maxAddrsToSend = 200
minPoolCount = 30
defaultPingLimit = 4
)
var (
@ -314,7 +315,7 @@ func (s *Server) startProtocol(p Peer) {
zap.Uint32("id", p.Version().Nonce))
s.discovery.RegisterGoodAddr(p.PeerAddr().String())
if s.chain.HeaderHeight() < p.Version().StartHeight {
if s.chain.HeaderHeight() < p.LastBlockIndex() {
err = s.requestHeaders(p)
if err != nil {
p.Disconnect(err)
@ -323,6 +324,7 @@ func (s *Server) startProtocol(p Peer) {
}
timer := time.NewTimer(s.ProtoTickInterval)
pingTimer := time.NewTimer(s.PingTimeout)
for {
select {
case err = <-p.Done():
@ -331,12 +333,31 @@ func (s *Server) startProtocol(p Peer) {
err = p.WriteMsg(m)
case <-timer.C:
// Try to sync in headers and block with the peer if his block height is higher then ours.
if p.Version().StartHeight > s.chain.BlockHeight() {
if p.LastBlockIndex() > s.chain.BlockHeight() {
err = s.requestBlocks(p)
}
if err == nil {
timer.Reset(s.ProtoTickInterval)
}
if s.chain.HeaderHeight() >= p.LastBlockIndex() {
block, errGetBlock := s.chain.GetBlock(s.chain.CurrentBlockHash())
if errGetBlock != nil {
err = errGetBlock
} else {
diff := uint32(time.Now().UTC().Unix()) - block.Timestamp
if diff > uint32(s.PingInterval/time.Second) {
p.UpdatePingSent(p.GetPingSent() + 1)
err = p.WriteMsg(NewMessage(s.Net, CMDPing, payload.NewPing(s.id, s.chain.HeaderHeight())))
}
}
}
case <-pingTimer.C:
if p.GetPingSent() > defaultPingLimit {
err = errors.New("ping/pong timeout")
} else {
pingTimer.Reset(s.PingTimeout)
p.UpdatePingSent(0)
}
}
if err != nil {
s.unregister <- peerDrop{p, err}
@ -394,7 +415,7 @@ func (s *Server) handleHeadersCmd(p Peer, headers *payload.Headers) {
// The peer will respond with a maximum of 2000 headers in one batch.
// We will ask one more batch here if needed. Eventually we will get synced
// due to the startProtocol routine that will ask headers every protoTick.
if s.chain.HeaderHeight() < p.Version().StartHeight {
if s.chain.HeaderHeight() < p.LastBlockIndex() {
s.requestHeaders(p)
}
}
@ -404,6 +425,25 @@ func (s *Server) handleBlockCmd(p Peer, block *core.Block) error {
return s.bQueue.putBlock(block)
}
// handlePing processes ping request.
func (s *Server) handlePing(p Peer, ping *payload.Ping) error {
return p.WriteMsg(NewMessage(s.Net, CMDPong, payload.NewPing(s.id, s.chain.BlockHeight())))
}
// handlePing processes pong request.
func (s *Server) handlePong(p Peer, pong *payload.Ping) error {
pingSent := p.GetPingSent()
if pingSent == 0 {
return errors.New("pong message wasn't expected")
}
p.UpdatePingSent(pingSent - 1)
p.UpdateLastBlockIndex(pong.LastBlockIndex)
if s.chain.HeaderHeight() < pong.LastBlockIndex {
return s.requestHeaders(p)
}
return nil
}
// handleInvCmd processes the received inventory.
func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error {
reqHashes := make([]util.Uint256, 0)
@ -640,6 +680,12 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
case CMDTX:
tx := msg.Payload.(*transaction.Transaction)
return s.handleTxCmd(tx)
case CMDPing:
ping := msg.Payload.(*payload.Ping)
return s.handlePing(peer, ping)
case CMDPong:
pong := msg.Payload.(*payload.Ping)
return s.handlePong(peer, pong)
case CMDVersion, CMDVerack:
return fmt.Errorf("received '%s' after the handshake", msg.CommandType())
}

View file

@ -52,6 +52,11 @@ type (
// When this is 0, the default interval of 5 seconds will be used.
ProtoTickInterval time.Duration
// Interval used in pinging mechanism for syncing blocks.
PingInterval time.Duration
// Time to wait for pong(response for sent ping request).
PingTimeout time.Duration
// Level of the internal logger.
LogLevel zapcore.Level
@ -83,6 +88,8 @@ func NewServerConfig(cfg config.Config) ServerConfig {
Seeds: protoConfig.SeedList,
DialTimeout: appConfig.DialTimeout * time.Second,
ProtoTickInterval: appConfig.ProtoTickInterval * time.Second,
PingInterval: appConfig.PingInterval * time.Second,
PingTimeout: appConfig.PingTimeout * time.Second,
MaxPeers: appConfig.MaxPeers,
AttemptConnPeers: appConfig.AttemptConnPeers,
MinPeers: appConfig.MinPeers,

View file

@ -31,6 +31,8 @@ type TCPPeer struct {
// The version of the peer.
version *payload.Version
// Index of the last block.
lastBlockIndex uint32
lock sync.RWMutex
handShake handShakeStage
@ -38,6 +40,9 @@ type TCPPeer struct {
done chan error
wg sync.WaitGroup
// number of sent pings.
pingSent int
}
// NewTCPPeer returns a TCPPeer structure based on the given connection.
@ -103,6 +108,7 @@ func (p *TCPPeer) HandleVersion(version *payload.Version) error {
return errors.New("invalid handshake: already received Version")
}
p.version = version
p.lastBlockIndex = version.StartHeight
p.handShake |= versionReceived
return nil
}
@ -191,3 +197,31 @@ func (p *TCPPeer) Disconnect(err error) {
func (p *TCPPeer) Version() *payload.Version {
return p.version
}
// LastBlockIndex returns last block index.
func (p *TCPPeer) LastBlockIndex() uint32 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.lastBlockIndex
}
// UpdateLastBlockIndex updates last block index.
func (p *TCPPeer) UpdateLastBlockIndex(newIndex uint32) {
p.lock.Lock()
defer p.lock.Unlock()
p.lastBlockIndex = newIndex
}
// GetPingSent returns flag whether ping was sent or not.
func (p *TCPPeer) GetPingSent() int {
p.lock.RLock()
defer p.lock.RUnlock()
return p.pingSent
}
// UpdatePingSent updates pingSent value.
func (p *TCPPeer) UpdatePingSent(newValue int) {
p.lock.Lock()
defer p.lock.Unlock()
p.pingSent = newValue
}