From 247cfa416531e896938233ff6f07e4fbacbc6d12 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 20 Jan 2020 16:58:28 +0300 Subject: [PATCH 1/5] network: either request blocks or ping a peer, but not both It makes to sense to do both actions, pings are made for a different purpose. Relates to #430. --- pkg/network/tcp_peer.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 62ceb417d..8c56f9d45 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -207,11 +207,7 @@ func (p *TCPPeer) StartProtocol() { // Try to sync in headers and block with the peer if his block height is higher then ours. if p.LastBlockIndex() > p.server.chain.BlockHeight() { err = p.server.requestBlocks(p) - } - if err == nil { - timer.Reset(p.server.ProtoTickInterval) - } - if p.server.chain.HeaderHeight() >= p.LastBlockIndex() { + } else if p.server.chain.HeaderHeight() >= p.LastBlockIndex() { block, errGetBlock := p.server.chain.GetBlock(p.server.chain.CurrentBlockHash()) if errGetBlock != nil { err = errGetBlock @@ -223,6 +219,9 @@ func (p *TCPPeer) StartProtocol() { } } } + if err == nil { + timer.Reset(p.server.ProtoTickInterval) + } case <-pingTimer.C: if p.GetPingSent() > defaultPingLimit { err = errors.New("ping/pong timeout") From a8252ecc05eba32b6654dbd8250e6b04d407ca79 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 20 Jan 2020 17:19:35 +0300 Subject: [PATCH 2/5] network: remove wrong ping condition In reality it will never be true exactly in the case where we want this ping mechanism to work --- when the node failed to get a block from the net. It won't get the header either and thus its block height will be equal to header height. The only moment when this condition is met is when the node does initial synchronization and this synchronization works just fine without any pings. Relates to #430. --- pkg/network/tcp_peer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 8c56f9d45..2c408b22d 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -207,7 +207,7 @@ func (p *TCPPeer) StartProtocol() { // Try to sync in headers and block with the peer if his block height is higher then ours. if p.LastBlockIndex() > p.server.chain.BlockHeight() { err = p.server.requestBlocks(p) - } else if p.server.chain.HeaderHeight() >= p.LastBlockIndex() { + } else { block, errGetBlock := p.server.chain.GetBlock(p.server.chain.CurrentBlockHash()) if errGetBlock != nil { err = errGetBlock From 62092c703d20874eb93711b33482cd68843a775f Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 20 Jan 2020 17:39:59 +0300 Subject: [PATCH 3/5] network: use local timestamp to decide when to ping We don't and we won't have synchronized clocks in the network so the only timestamp that we can compare our local time with is the one made ourselves. What this ping mechanism is used for is to recover from missing the block broadcast, thus it's appropriate for it to trigger after X seconds of the local time since the last block received. Relates to #430. --- pkg/network/server.go | 10 ++++++++++ pkg/network/tcp_peer.go | 13 ++++--------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index cf11d1df6..6c98ed665 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -68,6 +68,8 @@ type ( quit chan struct{} connected *atomic.Bool + // Time of the last block receival. + lastBlockTS *atomic.Int64 log *zap.Logger } @@ -101,6 +103,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) *S unregister: make(chan peerDrop), peers: make(map[Peer]bool), connected: atomic.NewBool(false), + lastBlockTS: atomic.NewInt64(0), log: log, } @@ -359,6 +362,7 @@ func (s *Server) handleHeadersCmd(p Peer, headers *payload.Headers) { // handleBlockCmd processes the received block received from its peer. func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { + s.lastBlockTS.Store(time.Now().UTC().Unix()) return s.bQueue.putBlock(block) } @@ -658,6 +662,12 @@ func (s *Server) handleNewPayload(p *consensus.Payload) { s.relayInventoryCmd(CMDInv, payload.ConsensusType, p.Hash()) } +// getLastBlockTime returns unix timestamp for the moment when the last block +// was received. +func (s *Server) getLastBlockTime() int64 { + return s.lastBlockTS.Load() +} + func (s *Server) requestTx(hashes ...util.Uint256) { if len(hashes) == 0 { return diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 2c408b22d..329a51130 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -208,15 +208,10 @@ func (p *TCPPeer) StartProtocol() { if p.LastBlockIndex() > p.server.chain.BlockHeight() { err = p.server.requestBlocks(p) } else { - block, errGetBlock := p.server.chain.GetBlock(p.server.chain.CurrentBlockHash()) - if errGetBlock != nil { - err = errGetBlock - } else { - diff := uint32(time.Now().UTC().Unix()) - block.Timestamp - if diff > uint32(p.server.PingInterval/time.Second) { - p.UpdatePingSent(p.GetPingSent() + 1) - err = p.EnqueueMessage(NewMessage(p.server.Net, CMDPing, payload.NewPing(p.server.id, p.server.chain.HeaderHeight()))) - } + diff := time.Now().UTC().Unix() - p.server.getLastBlockTime() + if diff > int64(p.server.PingInterval/time.Second) { + p.UpdatePingSent(p.GetPingSent() + 1) + err = p.EnqueueMessage(NewMessage(p.server.Net, CMDPing, payload.NewPing(p.server.id, p.server.chain.HeaderHeight()))) } } if err == nil { From 2c4ace022e455033381337aafa3e0ce75db709e6 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 20 Jan 2020 19:02:19 +0300 Subject: [PATCH 4/5] network/config: redesign ping timeout handling a bit 1) Make timeout a timeout, don't do magic ping counts. 2) Drop additional timer from the main peer's protocol loop, create it dynamically and make it disconnect the peer. 3) Don't expose the ping counter to the outside, handle more logic inside the Peer. Relates to #430. --- config/protocol.mainnet.yml | 4 +- config/protocol.privnet.docker.four.yml | 4 +- config/protocol.privnet.docker.one.yml | 4 +- config/protocol.privnet.docker.single.yml | 4 +- config/protocol.privnet.docker.three.yml | 4 +- config/protocol.privnet.docker.two.yml | 4 +- config/protocol.privnet.yml | 4 +- config/protocol.testnet.yml | 4 +- config/protocol.unit_testnet.yml | 4 +- pkg/network/helper_test.go | 16 +++---- pkg/network/peer.go | 11 +++-- pkg/network/server.go | 9 ++-- pkg/network/tcp_peer.go | 56 ++++++++++++----------- 13 files changed, 67 insertions(+), 61 deletions(-) diff --git a/config/protocol.mainnet.yml b/config/protocol.mainnet.yml index 2fd9c94b6..2e50755ae 100644 --- a/config/protocol.mainnet.yml +++ b/config/protocol.mainnet.yml @@ -50,8 +50,8 @@ ApplicationConfiguration: Relay: true DialTimeout: 3 ProtoTickInterval: 2 - PingInterval: 60 - PingTimeout: 60 + PingInterval: 30 + PingTimeout: 90 MaxPeers: 100 AttemptConnPeers: 20 MinPeers: 5 diff --git a/config/protocol.privnet.docker.four.yml b/config/protocol.privnet.docker.four.yml index 4981f4fee..2fe502d32 100644 --- a/config/protocol.privnet.docker.four.yml +++ b/config/protocol.privnet.docker.four.yml @@ -41,8 +41,8 @@ ApplicationConfiguration: Relay: true DialTimeout: 3 ProtoTickInterval: 2 - PingInterval: 60 - PingTimeout: 60 + PingInterval: 30 + PingTimeout: 90 MaxPeers: 10 AttemptConnPeers: 5 MinPeers: 3 diff --git a/config/protocol.privnet.docker.one.yml b/config/protocol.privnet.docker.one.yml index 28f7e9a3e..95792063c 100644 --- a/config/protocol.privnet.docker.one.yml +++ b/config/protocol.privnet.docker.one.yml @@ -41,8 +41,8 @@ ApplicationConfiguration: Relay: true DialTimeout: 3 ProtoTickInterval: 2 - PingInterval: 60 - PingTimeout: 60 + PingInterval: 30 + PingTimeout: 90 MaxPeers: 10 AttemptConnPeers: 5 MinPeers: 3 diff --git a/config/protocol.privnet.docker.single.yml b/config/protocol.privnet.docker.single.yml index 3c12813aa..fbc946ac8 100644 --- a/config/protocol.privnet.docker.single.yml +++ b/config/protocol.privnet.docker.single.yml @@ -35,8 +35,8 @@ ApplicationConfiguration: Relay: true DialTimeout: 3 ProtoTickInterval: 2 - PingInterval: 60 - PingTimeout: 60 + PingInterval: 30 + PingTimeout: 90 MaxPeers: 10 AttemptConnPeers: 5 MinPeers: 0 diff --git a/config/protocol.privnet.docker.three.yml b/config/protocol.privnet.docker.three.yml index d0b83f35a..b36dae766 100644 --- a/config/protocol.privnet.docker.three.yml +++ b/config/protocol.privnet.docker.three.yml @@ -41,8 +41,8 @@ ApplicationConfiguration: Relay: true DialTimeout: 3 ProtoTickInterval: 2 - PingInterval: 60 - PingTimeout: 60 + PingInterval: 30 + PingTimeout: 90 MaxPeers: 10 AttemptConnPeers: 5 MinPeers: 3 diff --git a/config/protocol.privnet.docker.two.yml b/config/protocol.privnet.docker.two.yml index 9936ae13d..8406c4a19 100644 --- a/config/protocol.privnet.docker.two.yml +++ b/config/protocol.privnet.docker.two.yml @@ -41,8 +41,8 @@ ApplicationConfiguration: Relay: true DialTimeout: 3 ProtoTickInterval: 2 - PingInterval: 60 - PingTimeout: 60 + PingInterval: 30 + PingTimeout: 90 MaxPeers: 10 AttemptConnPeers: 5 MinPeers: 3 diff --git a/config/protocol.privnet.yml b/config/protocol.privnet.yml index 247faee82..6166bd308 100644 --- a/config/protocol.privnet.yml +++ b/config/protocol.privnet.yml @@ -41,8 +41,8 @@ ApplicationConfiguration: Relay: true DialTimeout: 3 ProtoTickInterval: 2 - PingInterval: 60 - PingTimeout: 60 + PingInterval: 30 + PingTimeout: 90 MaxPeers: 10 AttemptConnPeers: 5 MinPeers: 3 diff --git a/config/protocol.testnet.yml b/config/protocol.testnet.yml index 6bc778911..f936dc261 100644 --- a/config/protocol.testnet.yml +++ b/config/protocol.testnet.yml @@ -50,8 +50,8 @@ ApplicationConfiguration: Relay: true DialTimeout: 3 ProtoTickInterval: 2 - PingInterval: 60 - PingTimeout: 60 + PingInterval: 30 + PingTimeout: 90 MaxPeers: 100 AttemptConnPeers: 20 MinPeers: 5 diff --git a/config/protocol.unit_testnet.yml b/config/protocol.unit_testnet.yml index 7fe53358c..09cfef518 100644 --- a/config/protocol.unit_testnet.yml +++ b/config/protocol.unit_testnet.yml @@ -40,8 +40,8 @@ ApplicationConfiguration: Relay: true DialTimeout: 3 ProtoTickInterval: 2 - PingInterval: 60 - PingTimeout: 60 + PingInterval: 30 + PingTimeout: 90 MaxPeers: 50 AttemptConnPeers: 5 MinPeers: 1 diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index f41685f78..30ba4321a 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -160,7 +160,7 @@ type localPeer struct { handshaked bool t *testing.T messageHandler func(t *testing.T, msg *Message) - pingSent int + pingSent int } func newLocalPeer(t *testing.T) *localPeer { @@ -206,9 +206,6 @@ func (p *localPeer) Version() *payload.Version { func (p *localPeer) LastBlockIndex() uint32 { return p.lastBlockIndex } -func (p *localPeer) UpdateLastBlockIndex(newIndex uint32) { - p.lastBlockIndex = newIndex -} func (p *localPeer) HandleVersion(v *payload.Version) error { p.version = v return nil @@ -225,11 +222,14 @@ func (p *localPeer) HandleVersionAck() error { p.handshaked = true return nil } -func (p *localPeer) GetPingSent() int { - return p.pingSent +func (p *localPeer) SendPing() error { + p.pingSent++ + return nil } -func (p *localPeer) UpdatePingSent(newValue int) { - p.pingSent = newValue +func (p *localPeer) HandlePong(pong *payload.Ping) error { + p.lastBlockIndex = pong.LastBlockIndex + p.pingSent-- + return nil } func (p *localPeer) Handshaked() bool { diff --git a/pkg/network/peer.go b/pkg/network/peer.go index 3fe9cb23d..d063c5ddf 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -36,8 +36,12 @@ type Peer interface { EnqueueHPPacket([]byte) error Version() *payload.Version LastBlockIndex() uint32 - UpdateLastBlockIndex(lbIndex uint32) Handshaked() bool + + // SendPing enqueues a ping message to be sent to the peer and does + // appropriate protocol handling like timeouts and outstanding pings + // management. + SendPing() error SendVersion(*Message) error SendVersionAck(*Message) error // StartProtocol is a goroutine to be run after the handshake. It @@ -45,6 +49,7 @@ type Peer interface { StartProtocol() HandleVersion(*payload.Version) error HandleVersionAck() error - GetPingSent() int - UpdatePingSent(int) + + // HandlePong checks pong contents against Peer's state and updates it. + HandlePong(pong *payload.Ping) error } diff --git a/pkg/network/server.go b/pkg/network/server.go index 6c98ed665..3c7e942e8 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -29,7 +29,6 @@ const ( maxBlockBatch = 200 maxAddrsToSend = 200 minPoolCount = 30 - defaultPingLimit = 4 ) var ( @@ -373,12 +372,10 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error { // handlePing processes pong request. func (s *Server) handlePong(p Peer, pong *payload.Ping) error { - pingSent := p.GetPingSent() - if pingSent == 0 { - return errors.New("pong message wasn't expected") + err := p.HandlePong(pong) + if err != nil { + return err } - p.UpdatePingSent(pingSent - 1) - p.UpdateLastBlockIndex(pong.LastBlockIndex) if s.chain.HeaderHeight() < pong.LastBlockIndex { return s.requestHeaders(p) } diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 329a51130..5683cfe39 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -25,7 +25,9 @@ const ( ) var ( - errStateMismatch = errors.New("tried to send protocol message before handshake completed") + errStateMismatch = errors.New("tried to send protocol message before handshake completed") + errPingPong = errors.New("ping/pong timeout") + errUnexpectedPong = errors.New("pong message wasn't expected") ) // TCPPeer represents a connected remote node in the @@ -51,7 +53,8 @@ type TCPPeer struct { wg sync.WaitGroup // number of sent pings. - pingSent int + pingSent int + pingTimer *time.Timer } // NewTCPPeer returns a TCPPeer structure based on the given connection. @@ -191,7 +194,6 @@ func (p *TCPPeer) StartProtocol() { } timer := time.NewTimer(p.server.ProtoTickInterval) - pingTimer := time.NewTimer(p.server.PingTimeout) for { select { case <-p.done: @@ -210,20 +212,12 @@ func (p *TCPPeer) StartProtocol() { } else { diff := time.Now().UTC().Unix() - p.server.getLastBlockTime() if diff > int64(p.server.PingInterval/time.Second) { - p.UpdatePingSent(p.GetPingSent() + 1) - err = p.EnqueueMessage(NewMessage(p.server.Net, CMDPing, payload.NewPing(p.server.id, p.server.chain.HeaderHeight()))) + err = p.SendPing() } } if err == nil { timer.Reset(p.server.ProtoTickInterval) } - case <-pingTimer.C: - if p.GetPingSent() > defaultPingLimit { - err = errors.New("ping/pong timeout") - } else { - pingTimer.Reset(p.server.PingTimeout) - p.UpdatePingSent(0) - } } if err != nil { timer.Stop() @@ -350,23 +344,33 @@ func (p *TCPPeer) LastBlockIndex() uint32 { return p.lastBlockIndex } -// UpdateLastBlockIndex updates last block index. -func (p *TCPPeer) UpdateLastBlockIndex(newIndex uint32) { +// SendPing sends a ping message to the peer and does appropriate accounting of +// outstanding pings and timeouts. +func (p *TCPPeer) SendPing() error { p.lock.Lock() - defer p.lock.Unlock() - p.lastBlockIndex = newIndex + p.pingSent++ + if p.pingTimer == nil { + p.pingTimer = time.AfterFunc(p.server.PingTimeout, func() { + p.Disconnect(errPingPong) + }) + } + p.lock.Unlock() + return p.EnqueueMessage(NewMessage(p.server.Net, CMDPing, payload.NewPing(p.server.id, p.server.chain.HeaderHeight()))) } -// GetPingSent returns flag whether ping was sent or not. -func (p *TCPPeer) GetPingSent() int { - p.lock.RLock() - defer p.lock.RUnlock() - return p.pingSent -} - -// UpdatePingSent updates pingSent value. -func (p *TCPPeer) UpdatePingSent(newValue int) { +// HandlePong handles a pong message received from the peer and does appropriate +// accounting of outstanding pings and timeouts. +func (p *TCPPeer) HandlePong(pong *payload.Ping) error { p.lock.Lock() defer p.lock.Unlock() - p.pingSent = newValue + if p.pingTimer != nil && !p.pingTimer.Stop() { + return errPingPong + } + p.pingTimer = nil + p.pingSent-- + if p.pingSent < 0 { + return errUnexpectedPong + } + p.lastBlockIndex = pong.LastBlockIndex + return nil } From f56383e9c875c49eca8cd503d376d70470779cbc Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 20 Jan 2020 19:17:44 +0300 Subject: [PATCH 5/5] network: use p.LastBlockIndex() in requestBlocks() Always compare to the best known block index, comparing to the StartHeight is just plain wrong now. --- pkg/network/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index 3c7e942e8..3bf0fa249 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -575,7 +575,7 @@ func (s *Server) requestBlocks(p Peer) error { if len(hashes) > 0 { payload := payload.NewInventory(payload.BlockType, hashes) return p.EnqueueMessage(NewMessage(s.Net, CMDGetData, payload)) - } else if s.chain.HeaderHeight() < p.Version().StartHeight { + } else if s.chain.HeaderHeight() < p.LastBlockIndex() { return s.requestHeaders(p) } return nil