network/config: redesign ping timeout handling a bit

1) Make timeout a timeout, don't do magic ping counts.
2) Drop additional timer from the main peer's protocol loop, create it
   dynamically and make it disconnect the peer.
3) Don't expose the ping counter to the outside, handle more logic inside the
   Peer.

Relates to #430.
This commit is contained in:
Roman Khimov 2020-01-20 19:02:19 +03:00
parent 62092c703d
commit 2c4ace022e
13 changed files with 67 additions and 61 deletions

View file

@ -50,8 +50,8 @@ ApplicationConfiguration:
Relay: true
DialTimeout: 3
ProtoTickInterval: 2
PingInterval: 60
PingTimeout: 60
PingInterval: 30
PingTimeout: 90
MaxPeers: 100
AttemptConnPeers: 20
MinPeers: 5

View file

@ -41,8 +41,8 @@ ApplicationConfiguration:
Relay: true
DialTimeout: 3
ProtoTickInterval: 2
PingInterval: 60
PingTimeout: 60
PingInterval: 30
PingTimeout: 90
MaxPeers: 10
AttemptConnPeers: 5
MinPeers: 3

View file

@ -41,8 +41,8 @@ ApplicationConfiguration:
Relay: true
DialTimeout: 3
ProtoTickInterval: 2
PingInterval: 60
PingTimeout: 60
PingInterval: 30
PingTimeout: 90
MaxPeers: 10
AttemptConnPeers: 5
MinPeers: 3

View file

@ -35,8 +35,8 @@ ApplicationConfiguration:
Relay: true
DialTimeout: 3
ProtoTickInterval: 2
PingInterval: 60
PingTimeout: 60
PingInterval: 30
PingTimeout: 90
MaxPeers: 10
AttemptConnPeers: 5
MinPeers: 0

View file

@ -41,8 +41,8 @@ ApplicationConfiguration:
Relay: true
DialTimeout: 3
ProtoTickInterval: 2
PingInterval: 60
PingTimeout: 60
PingInterval: 30
PingTimeout: 90
MaxPeers: 10
AttemptConnPeers: 5
MinPeers: 3

View file

@ -41,8 +41,8 @@ ApplicationConfiguration:
Relay: true
DialTimeout: 3
ProtoTickInterval: 2
PingInterval: 60
PingTimeout: 60
PingInterval: 30
PingTimeout: 90
MaxPeers: 10
AttemptConnPeers: 5
MinPeers: 3

View file

@ -41,8 +41,8 @@ ApplicationConfiguration:
Relay: true
DialTimeout: 3
ProtoTickInterval: 2
PingInterval: 60
PingTimeout: 60
PingInterval: 30
PingTimeout: 90
MaxPeers: 10
AttemptConnPeers: 5
MinPeers: 3

View file

@ -50,8 +50,8 @@ ApplicationConfiguration:
Relay: true
DialTimeout: 3
ProtoTickInterval: 2
PingInterval: 60
PingTimeout: 60
PingInterval: 30
PingTimeout: 90
MaxPeers: 100
AttemptConnPeers: 20
MinPeers: 5

View file

@ -40,8 +40,8 @@ ApplicationConfiguration:
Relay: true
DialTimeout: 3
ProtoTickInterval: 2
PingInterval: 60
PingTimeout: 60
PingInterval: 30
PingTimeout: 90
MaxPeers: 50
AttemptConnPeers: 5
MinPeers: 1

View file

@ -160,7 +160,7 @@ type localPeer struct {
handshaked bool
t *testing.T
messageHandler func(t *testing.T, msg *Message)
pingSent int
pingSent int
}
func newLocalPeer(t *testing.T) *localPeer {
@ -206,9 +206,6 @@ func (p *localPeer) Version() *payload.Version {
func (p *localPeer) LastBlockIndex() uint32 {
return p.lastBlockIndex
}
func (p *localPeer) UpdateLastBlockIndex(newIndex uint32) {
p.lastBlockIndex = newIndex
}
func (p *localPeer) HandleVersion(v *payload.Version) error {
p.version = v
return nil
@ -225,11 +222,14 @@ func (p *localPeer) HandleVersionAck() error {
p.handshaked = true
return nil
}
func (p *localPeer) GetPingSent() int {
return p.pingSent
func (p *localPeer) SendPing() error {
p.pingSent++
return nil
}
func (p *localPeer) UpdatePingSent(newValue int) {
p.pingSent = newValue
func (p *localPeer) HandlePong(pong *payload.Ping) error {
p.lastBlockIndex = pong.LastBlockIndex
p.pingSent--
return nil
}
func (p *localPeer) Handshaked() bool {

View file

@ -36,8 +36,12 @@ type Peer interface {
EnqueueHPPacket([]byte) error
Version() *payload.Version
LastBlockIndex() uint32
UpdateLastBlockIndex(lbIndex uint32)
Handshaked() bool
// SendPing enqueues a ping message to be sent to the peer and does
// appropriate protocol handling like timeouts and outstanding pings
// management.
SendPing() error
SendVersion(*Message) error
SendVersionAck(*Message) error
// StartProtocol is a goroutine to be run after the handshake. It
@ -45,6 +49,7 @@ type Peer interface {
StartProtocol()
HandleVersion(*payload.Version) error
HandleVersionAck() error
GetPingSent() int
UpdatePingSent(int)
// HandlePong checks pong contents against Peer's state and updates it.
HandlePong(pong *payload.Ping) error
}

View file

@ -29,7 +29,6 @@ const (
maxBlockBatch = 200
maxAddrsToSend = 200
minPoolCount = 30
defaultPingLimit = 4
)
var (
@ -373,12 +372,10 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error {
// handlePing processes pong request.
func (s *Server) handlePong(p Peer, pong *payload.Ping) error {
pingSent := p.GetPingSent()
if pingSent == 0 {
return errors.New("pong message wasn't expected")
err := p.HandlePong(pong)
if err != nil {
return err
}
p.UpdatePingSent(pingSent - 1)
p.UpdateLastBlockIndex(pong.LastBlockIndex)
if s.chain.HeaderHeight() < pong.LastBlockIndex {
return s.requestHeaders(p)
}

View file

@ -25,7 +25,9 @@ const (
)
var (
errStateMismatch = errors.New("tried to send protocol message before handshake completed")
errStateMismatch = errors.New("tried to send protocol message before handshake completed")
errPingPong = errors.New("ping/pong timeout")
errUnexpectedPong = errors.New("pong message wasn't expected")
)
// TCPPeer represents a connected remote node in the
@ -51,7 +53,8 @@ type TCPPeer struct {
wg sync.WaitGroup
// number of sent pings.
pingSent int
pingSent int
pingTimer *time.Timer
}
// NewTCPPeer returns a TCPPeer structure based on the given connection.
@ -191,7 +194,6 @@ func (p *TCPPeer) StartProtocol() {
}
timer := time.NewTimer(p.server.ProtoTickInterval)
pingTimer := time.NewTimer(p.server.PingTimeout)
for {
select {
case <-p.done:
@ -210,20 +212,12 @@ func (p *TCPPeer) StartProtocol() {
} else {
diff := time.Now().UTC().Unix() - p.server.getLastBlockTime()
if diff > int64(p.server.PingInterval/time.Second) {
p.UpdatePingSent(p.GetPingSent() + 1)
err = p.EnqueueMessage(NewMessage(p.server.Net, CMDPing, payload.NewPing(p.server.id, p.server.chain.HeaderHeight())))
err = p.SendPing()
}
}
if err == nil {
timer.Reset(p.server.ProtoTickInterval)
}
case <-pingTimer.C:
if p.GetPingSent() > defaultPingLimit {
err = errors.New("ping/pong timeout")
} else {
pingTimer.Reset(p.server.PingTimeout)
p.UpdatePingSent(0)
}
}
if err != nil {
timer.Stop()
@ -350,23 +344,33 @@ func (p *TCPPeer) LastBlockIndex() uint32 {
return p.lastBlockIndex
}
// UpdateLastBlockIndex updates last block index.
func (p *TCPPeer) UpdateLastBlockIndex(newIndex uint32) {
// SendPing sends a ping message to the peer and does appropriate accounting of
// outstanding pings and timeouts.
func (p *TCPPeer) SendPing() error {
p.lock.Lock()
defer p.lock.Unlock()
p.lastBlockIndex = newIndex
p.pingSent++
if p.pingTimer == nil {
p.pingTimer = time.AfterFunc(p.server.PingTimeout, func() {
p.Disconnect(errPingPong)
})
}
p.lock.Unlock()
return p.EnqueueMessage(NewMessage(p.server.Net, CMDPing, payload.NewPing(p.server.id, p.server.chain.HeaderHeight())))
}
// GetPingSent returns flag whether ping was sent or not.
func (p *TCPPeer) GetPingSent() int {
p.lock.RLock()
defer p.lock.RUnlock()
return p.pingSent
}
// UpdatePingSent updates pingSent value.
func (p *TCPPeer) UpdatePingSent(newValue int) {
// HandlePong handles a pong message received from the peer and does appropriate
// accounting of outstanding pings and timeouts.
func (p *TCPPeer) HandlePong(pong *payload.Ping) error {
p.lock.Lock()
defer p.lock.Unlock()
p.pingSent = newValue
if p.pingTimer != nil && !p.pingTimer.Stop() {
return errPingPong
}
p.pingTimer = nil
p.pingSent--
if p.pingSent < 0 {
return errUnexpectedPong
}
p.lastBlockIndex = pong.LastBlockIndex
return nil
}