diff --git a/config/config.go b/config/config.go index f962ec41c..bda9d07c4 100644 --- a/config/config.go +++ b/config/config.go @@ -73,6 +73,8 @@ type ( Relay bool `yaml:"Relay"` DialTimeout time.Duration `yaml:"DialTimeout"` ProtoTickInterval time.Duration `yaml:"ProtoTickInterval"` + PingInterval time.Duration `yaml:"PingInterval"` + PingTimeout time.Duration `yaml:"PingTimeout"` MaxPeers int `yaml:"MaxPeers"` AttemptConnPeers int `yaml:"AttemptConnPeers"` MinPeers int `yaml:"MinPeers"` diff --git a/config/protocol.mainnet.yml b/config/protocol.mainnet.yml index 0a6b125f5..2fd9c94b6 100644 --- a/config/protocol.mainnet.yml +++ b/config/protocol.mainnet.yml @@ -50,6 +50,8 @@ ApplicationConfiguration: Relay: true DialTimeout: 3 ProtoTickInterval: 2 + PingInterval: 60 + PingTimeout: 60 MaxPeers: 100 AttemptConnPeers: 20 MinPeers: 5 diff --git a/config/protocol.privnet.docker.four.yml b/config/protocol.privnet.docker.four.yml index 05fdd6fa4..058c97a92 100644 --- a/config/protocol.privnet.docker.four.yml +++ b/config/protocol.privnet.docker.four.yml @@ -41,6 +41,8 @@ ApplicationConfiguration: Relay: true DialTimeout: 3 ProtoTickInterval: 2 + PingInterval: 60 + PingTimeout: 60 MaxPeers: 10 AttemptConnPeers: 5 MinPeers: 3 diff --git a/config/protocol.privnet.docker.one.yml b/config/protocol.privnet.docker.one.yml index f44bc8df5..76932694e 100644 --- a/config/protocol.privnet.docker.one.yml +++ b/config/protocol.privnet.docker.one.yml @@ -41,6 +41,8 @@ ApplicationConfiguration: Relay: true DialTimeout: 3 ProtoTickInterval: 2 + PingInterval: 60 + PingTimeout: 60 MaxPeers: 10 AttemptConnPeers: 5 MinPeers: 3 diff --git a/config/protocol.privnet.docker.single.yml b/config/protocol.privnet.docker.single.yml index 2e0cdcd7e..62bd270c1 100644 --- a/config/protocol.privnet.docker.single.yml +++ b/config/protocol.privnet.docker.single.yml @@ -35,6 +35,8 @@ ApplicationConfiguration: Relay: true DialTimeout: 3 ProtoTickInterval: 2 + PingInterval: 60 + PingTimeout: 60 MaxPeers: 10 AttemptConnPeers: 5 MinPeers: 0 diff --git a/config/protocol.privnet.docker.three.yml b/config/protocol.privnet.docker.three.yml index ad746a52d..83cda9e1d 100644 --- a/config/protocol.privnet.docker.three.yml +++ b/config/protocol.privnet.docker.three.yml @@ -41,6 +41,8 @@ ApplicationConfiguration: Relay: true DialTimeout: 3 ProtoTickInterval: 2 + PingInterval: 60 + PingTimeout: 60 MaxPeers: 10 AttemptConnPeers: 5 MinPeers: 3 diff --git a/config/protocol.privnet.docker.two.yml b/config/protocol.privnet.docker.two.yml index b250aab10..be61e6968 100644 --- a/config/protocol.privnet.docker.two.yml +++ b/config/protocol.privnet.docker.two.yml @@ -41,6 +41,8 @@ ApplicationConfiguration: Relay: true DialTimeout: 3 ProtoTickInterval: 2 + PingInterval: 60 + PingTimeout: 60 MaxPeers: 10 AttemptConnPeers: 5 MinPeers: 3 diff --git a/config/protocol.privnet.yml b/config/protocol.privnet.yml index 87c97d2e0..247faee82 100644 --- a/config/protocol.privnet.yml +++ b/config/protocol.privnet.yml @@ -41,6 +41,8 @@ ApplicationConfiguration: Relay: true DialTimeout: 3 ProtoTickInterval: 2 + PingInterval: 60 + PingTimeout: 60 MaxPeers: 10 AttemptConnPeers: 5 MinPeers: 3 diff --git a/config/protocol.testnet.yml b/config/protocol.testnet.yml index fe069eaf6..6bc778911 100644 --- a/config/protocol.testnet.yml +++ b/config/protocol.testnet.yml @@ -50,6 +50,8 @@ ApplicationConfiguration: Relay: true DialTimeout: 3 ProtoTickInterval: 2 + PingInterval: 60 + PingTimeout: 60 MaxPeers: 100 AttemptConnPeers: 20 MinPeers: 5 diff --git a/config/protocol.unit_testnet.yml b/config/protocol.unit_testnet.yml index 21c8b462d..7fe53358c 100644 --- a/config/protocol.unit_testnet.yml +++ b/config/protocol.unit_testnet.yml @@ -40,6 +40,8 @@ ApplicationConfiguration: Relay: true DialTimeout: 3 ProtoTickInterval: 2 + PingInterval: 60 + PingTimeout: 60 MaxPeers: 50 AttemptConnPeers: 5 MinPeers: 1 diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 5fdc86e5f..a0f151d98 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -153,9 +153,11 @@ var defaultMessageHandler = func(t *testing.T, msg *Message) {} type localPeer struct { netaddr net.TCPAddr version *payload.Version + lastBlockIndex uint32 handshaked bool t *testing.T messageHandler func(t *testing.T, msg *Message) + pingSent int } func newLocalPeer(t *testing.T) *localPeer { @@ -185,6 +187,12 @@ func (p *localPeer) Done() chan error { func (p *localPeer) Version() *payload.Version { return p.version } +func (p *localPeer) LastBlockIndex() uint32 { + return p.lastBlockIndex +} +func (p *localPeer) UpdateLastBlockIndex(newIndex uint32) { + p.lastBlockIndex = newIndex +} func (p *localPeer) HandleVersion(v *payload.Version) error { p.version = v return nil @@ -199,6 +207,12 @@ func (p *localPeer) HandleVersionAck() error { p.handshaked = true return nil } +func (p *localPeer) GetPingSent() int { + return p.pingSent +} +func (p *localPeer) UpdatePingSent(newValue int) { + p.pingSent = newValue +} func (p *localPeer) Handshaked() bool { return p.handshaked diff --git a/pkg/network/message.go b/pkg/network/message.go index 087724967..09ff334ed 100644 --- a/pkg/network/message.go +++ b/pkg/network/message.go @@ -197,6 +197,8 @@ func (m *Message) decodePayload(br *io.BinReader) error { p = &transaction.Transaction{} case CMDMerkleBlock: p = &payload.MerkleBlock{} + case CMDPing, CMDPong: + p = &payload.Ping{} default: return fmt.Errorf("can't decode command %s", cmdByteArrayToString(m.Command)) } diff --git a/pkg/network/payload/ping.go b/pkg/network/payload/ping.go new file mode 100644 index 000000000..85e0acced --- /dev/null +++ b/pkg/network/payload/ping.go @@ -0,0 +1,40 @@ +package payload + +import ( + "time" + + "github.com/CityOfZion/neo-go/pkg/io" +) + +// Ping payload for ping/pong payloads. +type Ping struct { + // Index of the last block. + LastBlockIndex uint32 + // Timestamp. + Timestamp uint32 + // Nonce of the server. + Nonce uint32 +} + +// NewPing creates new Ping payload. +func NewPing(blockIndex uint32, nonce uint32) *Ping { + return &Ping{ + LastBlockIndex: blockIndex, + Timestamp: uint32(time.Now().UTC().Unix()), + Nonce: nonce, + } +} + +// DecodeBinary implements Serializable interface. +func (p *Ping) DecodeBinary(br *io.BinReader) { + p.LastBlockIndex = br.ReadU32LE() + p.Timestamp = br.ReadU32LE() + p.Nonce = br.ReadU32LE() +} + +// EncodeBinary implements Serializable interface. +func (p *Ping) EncodeBinary(bw *io.BinWriter) { + bw.WriteU32LE(p.LastBlockIndex) + bw.WriteU32LE(p.Timestamp) + bw.WriteU32LE(p.Nonce) +} diff --git a/pkg/network/payload/ping_test.go b/pkg/network/payload/ping_test.go new file mode 100644 index 000000000..c9f8678b8 --- /dev/null +++ b/pkg/network/payload/ping_test.go @@ -0,0 +1,25 @@ +package payload + +import ( + "testing" + + "github.com/CityOfZion/neo-go/pkg/io" + "github.com/stretchr/testify/assert" +) + +func TestEncodeDecodeBinary(t *testing.T) { + payload := NewPing(uint32(1), uint32(2)) + assert.NotEqual(t, 0, payload.Timestamp) + + bufBinWriter := io.NewBufBinWriter() + payload.EncodeBinary(bufBinWriter.BinWriter) + assert.Nil(t, bufBinWriter.Err) + + binReader := io.NewBinReaderFromBuf(bufBinWriter.Bytes()) + decodedPing := &Ping{} + decodedPing.DecodeBinary(binReader) + assert.Nil(t, binReader.Err) + + assert.Equal(t, uint32(1), decodedPing.LastBlockIndex) + assert.Equal(t, uint32(2), decodedPing.Nonce) +} diff --git a/pkg/network/peer.go b/pkg/network/peer.go index f56beab01..2562153fd 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -21,9 +21,13 @@ type Peer interface { WriteMsg(msg *Message) error Done() chan error Version() *payload.Version + LastBlockIndex() uint32 + UpdateLastBlockIndex(lbIndex uint32) Handshaked() bool SendVersion(*Message) error SendVersionAck(*Message) error HandleVersion(*payload.Version) error HandleVersionAck() error + GetPingSent() int + UpdatePingSent(int) } diff --git a/pkg/network/server.go b/pkg/network/server.go index b0b3eef89..72a37a7bc 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -27,6 +27,7 @@ const ( maxBlockBatch = 200 maxAddrsToSend = 200 minPoolCount = 30 + defaultPingLimit = 4 ) var ( @@ -314,7 +315,7 @@ func (s *Server) startProtocol(p Peer) { zap.Uint32("id", p.Version().Nonce)) s.discovery.RegisterGoodAddr(p.PeerAddr().String()) - if s.chain.HeaderHeight() < p.Version().StartHeight { + if s.chain.HeaderHeight() < p.LastBlockIndex() { err = s.requestHeaders(p) if err != nil { p.Disconnect(err) @@ -323,6 +324,7 @@ func (s *Server) startProtocol(p Peer) { } timer := time.NewTimer(s.ProtoTickInterval) + pingTimer := time.NewTimer(s.PingTimeout) for { select { case err = <-p.Done(): @@ -331,12 +333,31 @@ func (s *Server) startProtocol(p Peer) { err = p.WriteMsg(m) case <-timer.C: // Try to sync in headers and block with the peer if his block height is higher then ours. - if p.Version().StartHeight > s.chain.BlockHeight() { + if p.LastBlockIndex() > s.chain.BlockHeight() { err = s.requestBlocks(p) } if err == nil { timer.Reset(s.ProtoTickInterval) } + if s.chain.HeaderHeight() >= p.LastBlockIndex() { + block, errGetBlock := s.chain.GetBlock(s.chain.CurrentBlockHash()) + if errGetBlock != nil { + err = errGetBlock + } else { + diff := uint32(time.Now().UTC().Unix()) - block.Timestamp + if diff > uint32(s.PingInterval/time.Second) { + p.UpdatePingSent(p.GetPingSent() + 1) + err = p.WriteMsg(NewMessage(s.Net, CMDPing, payload.NewPing(s.id, s.chain.HeaderHeight()))) + } + } + } + case <-pingTimer.C: + if p.GetPingSent() > defaultPingLimit { + err = errors.New("ping/pong timeout") + } else { + pingTimer.Reset(s.PingTimeout) + p.UpdatePingSent(0) + } } if err != nil { s.unregister <- peerDrop{p, err} @@ -394,7 +415,7 @@ func (s *Server) handleHeadersCmd(p Peer, headers *payload.Headers) { // The peer will respond with a maximum of 2000 headers in one batch. // We will ask one more batch here if needed. Eventually we will get synced // due to the startProtocol routine that will ask headers every protoTick. - if s.chain.HeaderHeight() < p.Version().StartHeight { + if s.chain.HeaderHeight() < p.LastBlockIndex() { s.requestHeaders(p) } } @@ -404,6 +425,25 @@ func (s *Server) handleBlockCmd(p Peer, block *core.Block) error { return s.bQueue.putBlock(block) } +// handlePing processes ping request. +func (s *Server) handlePing(p Peer, ping *payload.Ping) error { + return p.WriteMsg(NewMessage(s.Net, CMDPong, payload.NewPing(s.id, s.chain.BlockHeight()))) +} + +// handlePing processes pong request. +func (s *Server) handlePong(p Peer, pong *payload.Ping) error { + pingSent := p.GetPingSent() + if pingSent == 0 { + return errors.New("pong message wasn't expected") + } + p.UpdatePingSent(pingSent - 1) + p.UpdateLastBlockIndex(pong.LastBlockIndex) + if s.chain.HeaderHeight() < pong.LastBlockIndex { + return s.requestHeaders(p) + } + return nil +} + // handleInvCmd processes the received inventory. func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error { reqHashes := make([]util.Uint256, 0) @@ -640,6 +680,12 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { case CMDTX: tx := msg.Payload.(*transaction.Transaction) return s.handleTxCmd(tx) + case CMDPing: + ping := msg.Payload.(*payload.Ping) + return s.handlePing(peer, ping) + case CMDPong: + pong := msg.Payload.(*payload.Ping) + return s.handlePong(peer, pong) case CMDVersion, CMDVerack: return fmt.Errorf("received '%s' after the handshake", msg.CommandType()) } diff --git a/pkg/network/server_config.go b/pkg/network/server_config.go index 0cfad7f7e..9362aa276 100644 --- a/pkg/network/server_config.go +++ b/pkg/network/server_config.go @@ -52,6 +52,11 @@ type ( // When this is 0, the default interval of 5 seconds will be used. ProtoTickInterval time.Duration + // Interval used in pinging mechanism for syncing blocks. + PingInterval time.Duration + // Time to wait for pong(response for sent ping request). + PingTimeout time.Duration + // Level of the internal logger. LogLevel zapcore.Level @@ -83,6 +88,8 @@ func NewServerConfig(cfg config.Config) ServerConfig { Seeds: protoConfig.SeedList, DialTimeout: appConfig.DialTimeout * time.Second, ProtoTickInterval: appConfig.ProtoTickInterval * time.Second, + PingInterval: appConfig.PingInterval * time.Second, + PingTimeout: appConfig.PingTimeout * time.Second, MaxPeers: appConfig.MaxPeers, AttemptConnPeers: appConfig.AttemptConnPeers, MinPeers: appConfig.MinPeers, diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 78f10862b..b1dd94a82 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -31,6 +31,8 @@ type TCPPeer struct { // The version of the peer. version *payload.Version + // Index of the last block. + lastBlockIndex uint32 lock sync.RWMutex handShake handShakeStage @@ -38,6 +40,9 @@ type TCPPeer struct { done chan error wg sync.WaitGroup + + // number of sent pings. + pingSent int } // NewTCPPeer returns a TCPPeer structure based on the given connection. @@ -103,6 +108,7 @@ func (p *TCPPeer) HandleVersion(version *payload.Version) error { return errors.New("invalid handshake: already received Version") } p.version = version + p.lastBlockIndex = version.StartHeight p.handShake |= versionReceived return nil } @@ -191,3 +197,31 @@ func (p *TCPPeer) Disconnect(err error) { func (p *TCPPeer) Version() *payload.Version { return p.version } + +// LastBlockIndex returns last block index. +func (p *TCPPeer) LastBlockIndex() uint32 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.lastBlockIndex +} + +// UpdateLastBlockIndex updates last block index. +func (p *TCPPeer) UpdateLastBlockIndex(newIndex uint32) { + p.lock.Lock() + defer p.lock.Unlock() + p.lastBlockIndex = newIndex +} + +// GetPingSent returns flag whether ping was sent or not. +func (p *TCPPeer) GetPingSent() int { + p.lock.RLock() + defer p.lock.RUnlock() + return p.pingSent +} + +// UpdatePingSent updates pingSent value. +func (p *TCPPeer) UpdatePingSent(newValue int) { + p.lock.Lock() + defer p.lock.Unlock() + p.pingSent = newValue +}