diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index 03340eee9..bb9606240 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -18,6 +18,7 @@ type Discoverer interface { RequestRemote(int) RegisterBadAddr(string) RegisterGoodAddr(string) + RegisterConnectedAddr(string) UnregisterConnectedAddr(string) UnconnectedPeers() []string BadPeers() []string @@ -153,8 +154,8 @@ func (d *DefaultDiscovery) UnregisterConnectedAddr(s string) { d.lock.Unlock() } -// registerConnectedAddr tells discoverer that given address is now connected. -func (d *DefaultDiscovery) registerConnectedAddr(addr string) { +// RegisterConnectedAddr tells discoverer that given address is now connected. +func (d *DefaultDiscovery) RegisterConnectedAddr(addr string) { d.lock.Lock() delete(d.unconnectedAddrs, addr) d.connectedAddrs[addr] = true @@ -166,7 +167,7 @@ func (d *DefaultDiscovery) tryAddress(addr string) { d.RegisterBadAddr(addr) d.RequestRemote(1) } else { - d.registerConnectedAddr(addr) + d.RegisterConnectedAddr(addr) } } diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 3f76a5b30..823d78525 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -136,6 +136,7 @@ func (d testDiscovery) BackFill(addrs ...string) {} func (d testDiscovery) PoolCount() int { return 0 } func (d testDiscovery) RegisterBadAddr(string) {} func (d testDiscovery) RegisterGoodAddr(string) {} +func (d testDiscovery) RegisterConnectedAddr(string) {} func (d testDiscovery) UnregisterConnectedAddr(string) {} func (d testDiscovery) UnconnectedPeers() []string { return []string{} } func (d testDiscovery) RequestRemote(n int) {} @@ -193,6 +194,12 @@ func (p *localPeer) EnqueueMessage(msg *Message) error { func (p *localPeer) EnqueuePacket(m []byte) error { return p.EnqueueHPPacket(m) } +func (p *localPeer) EnqueueP2PMessage(msg *Message) error { + return p.EnqueueMessage(msg) +} +func (p *localPeer) EnqueueP2PPacket(m []byte) error { + return p.EnqueueHPPacket(m) +} func (p *localPeer) EnqueueHPPacket(m []byte) error { msg := &Message{} r := io.NewBinReaderFromBuf(m) diff --git a/pkg/network/peer.go b/pkg/network/peer.go index 332a4f99a..323127b4e 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -30,6 +30,20 @@ type Peer interface { // completed handshaking. EnqueuePacket([]byte) error + // EnqueueP2PMessage is a temporary wrapper that sends a message via + // EnqueueP2PPacket if there is no error in serializing it. + EnqueueP2PMessage(*Message) error + + // EnqueueP2PPacket is a blocking packet enqueuer, it doesn't return until + // it puts given packet into the queue. It accepts a slice of bytes that + // can be shared with other queues (so that message marshalling can be + // done once for all peers). Does nothing is the peer is not yet + // completed handshaking. This queue is intended to be used for unicast + // peer to peer communication that is more important than broadcasts + // (handled by EnqueuePacket), but less important than high-priority + // messages (handled by EnqueueHPPacket). + EnqueueP2PPacket([]byte) error + // EnqueueHPPacket is a blocking high priority packet enqueuer, it // doesn't return until it puts given packet into the high-priority // queue. diff --git a/pkg/network/server.go b/pkg/network/server.go index 89357d632..c62d7f34c 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -214,8 +214,8 @@ func (s *Server) run() { s.lock.Lock() s.peers[p] = true s.lock.Unlock() - s.log.Info("new peer connected", zap.Stringer("addr", p.RemoteAddr())) peerCount := s.PeerCount() + s.log.Info("new peer connected", zap.Stringer("addr", p.RemoteAddr()), zap.Int("peerCount", peerCount)) if peerCount > s.MaxPeers { s.lock.RLock() // Pick a random peer and drop connection to it. @@ -239,7 +239,7 @@ func (s *Server) run() { addr := drop.peer.PeerAddr().String() if drop.reason == errIdenticalID { s.discovery.RegisterBadAddr(addr) - } else { + } else if drop.reason != errAlreadyConnected { s.discovery.UnregisterConnectedAddr(addr) s.discovery.BackFill(addr) } @@ -348,10 +348,15 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { return errIdenticalID } peerAddr := p.PeerAddr().String() + s.discovery.RegisterConnectedAddr(peerAddr) s.lock.RLock() for peer := range s.peers { + if p == peer { + continue + } + ver := peer.Version() // Already connected, drop this connection. - if peer.Handshaked() && peer.PeerAddr().String() == peerAddr && peer.Version().Nonce == version.Nonce { + if ver != nil && ver.Nonce == version.Nonce && peer.PeerAddr().String() == peerAddr { s.lock.RUnlock() return errAlreadyConnected } @@ -384,7 +389,7 @@ func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { // handlePing processes ping request. func (s *Server) handlePing(p Peer, ping *payload.Ping) error { - return p.EnqueueMessage(s.MkMsg(CMDPong, payload.NewPing(s.id, s.chain.BlockHeight()))) + return p.EnqueueP2PMessage(s.MkMsg(CMDPong, payload.NewPing(s.chain.BlockHeight(), s.id))) } // handlePing processes pong request. @@ -426,7 +431,7 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error { if inv.Type == payload.ConsensusType { return p.EnqueueHPPacket(pkt) } - return p.EnqueuePacket(pkt) + return p.EnqueueP2PPacket(pkt) } return nil } @@ -458,7 +463,7 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error { if inv.Type == payload.ConsensusType { err = p.EnqueueHPPacket(pkt) } else { - err = p.EnqueuePacket(pkt) + err = p.EnqueueP2PPacket(pkt) } } if err != nil { @@ -496,7 +501,7 @@ func (s *Server) handleGetBlocksCmd(p Peer, gb *payload.GetBlocks) error { } payload := payload.NewInventory(payload.BlockType, blockHashes) msg := s.MkMsg(CMDInv, payload) - return p.EnqueueMessage(msg) + return p.EnqueueP2PMessage(msg) } // handleGetHeadersCmd processes the getheaders request. @@ -526,7 +531,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlocks) error { return nil } msg := s.MkMsg(CMDHeaders, &resp) - return p.EnqueueMessage(msg) + return p.EnqueueP2PMessage(msg) } // handleConsensusCmd processes received consensus payload. @@ -539,10 +544,12 @@ func (s *Server) handleConsensusCmd(cp *consensus.Payload) error { // handleTxCmd processes received transaction. // It never returns an error. func (s *Server) handleTxCmd(tx *transaction.Transaction) error { - s.consensus.OnTransaction(tx) // It's OK for it to fail for various reasons like tx already existing // in the pool. - _ = s.RelayTxn(tx) + if s.verifyAndPoolTX(tx) == RelaySucceed { + s.consensus.OnTransaction(tx) + go s.broadcastTX(tx) + } return nil } @@ -567,7 +574,7 @@ func (s *Server) handleGetAddrCmd(p Peer) error { netaddr, _ := net.ResolveTCPAddr("tcp", addr) alist.Addrs[i] = payload.NewAddressAndTime(netaddr, ts) } - return p.EnqueueMessage(s.MkMsg(CMDAddr, alist)) + return p.EnqueueP2PMessage(s.MkMsg(CMDAddr, alist)) } // requestHeaders sends a getheaders message to the peer. @@ -575,7 +582,7 @@ func (s *Server) handleGetAddrCmd(p Peer) error { func (s *Server) requestHeaders(p Peer) error { start := []util.Uint256{s.chain.CurrentHeaderHash()} payload := payload.NewGetBlocks(start, util.Uint256{}) - return p.EnqueueMessage(s.MkMsg(CMDGetHeaders, payload)) + return p.EnqueueP2PMessage(s.MkMsg(CMDGetHeaders, payload)) } // requestBlocks sends a getdata message to the peer @@ -594,7 +601,7 @@ func (s *Server) requestBlocks(p Peer) error { } if len(hashes) > 0 { payload := payload.NewInventory(payload.BlockType, hashes) - return p.EnqueueMessage(s.MkMsg(CMDGetData, payload)) + return p.EnqueueP2PMessage(s.MkMsg(CMDGetData, payload)) } else if s.chain.HeaderHeight() < p.LastBlockIndex() { return s.requestHeaders(p) } @@ -603,6 +610,10 @@ func (s *Server) requestBlocks(p Peer) error { // handleMessage processes the given message. func (s *Server) handleMessage(peer Peer, msg *Message) error { + s.log.Debug("got msg", + zap.Stringer("addr", peer.RemoteAddr()), + zap.String("type", string(msg.CommandType()))) + // Make sure both server and peer are operating on // the same network. if msg.Magic != s.Net { @@ -727,9 +738,8 @@ func (s *Server) relayBlock(b *block.Block) { s.broadcastMessage(msg) } -// RelayTxn a new transaction to the local node and the connected peers. -// Reference: the method OnRelay in C#: https://github.com/neo-project/neo/blob/master/neo/Network/P2P/LocalNode.cs#L159 -func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason { +// verifyAndPoolTX verifies the TX and adds it to the local mempool. +func (s *Server) verifyAndPoolTX(t *transaction.Transaction) RelayReason { if t.Type == transaction.MinerType { return RelayInvalid } @@ -745,7 +755,21 @@ func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason { if ok := s.chain.GetMemPool().TryAdd(t.Hash(), mempool.NewPoolItem(t, s.chain)); !ok { return RelayOutOfMemory } + return RelaySucceed +} +// RelayTxn a new transaction to the local node and the connected peers. +// Reference: the method OnRelay in C#: https://github.com/neo-project/neo/blob/master/neo/Network/P2P/LocalNode.cs#L159 +func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason { + ret := s.verifyAndPoolTX(t) + if ret == RelaySucceed { + s.broadcastTX(t) + } + return ret +} + +// broadcastTX broadcasts an inventory message about new transaction. +func (s *Server) broadcastTX(t *transaction.Transaction) { msg := s.MkMsg(CMDInv, payload.NewInventory(payload.TXType, []util.Uint256{t.Hash()})) // We need to filter out non-relaying nodes, so plain broadcast @@ -753,6 +777,4 @@ func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason { s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool { return p.Handshaked() && p.Version().Relay }) - - return RelaySucceed } diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 1dfbbcc44..51d9bfe40 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -2,6 +2,7 @@ package network import ( "errors" + "fmt" "net" "strconv" "sync" @@ -21,6 +22,7 @@ const ( verAckReceived requestQueueSize = 32 + p2pMsgQueueSize = 16 hpRequestQueueSize = 4 ) @@ -46,9 +48,10 @@ type TCPPeer struct { finale sync.Once handShake handShakeStage - done chan struct{} - sendQ chan []byte - hpSendQ chan []byte + done chan struct{} + sendQ chan []byte + p2pSendQ chan []byte + hpSendQ chan []byte wg sync.WaitGroup @@ -60,41 +63,60 @@ type TCPPeer struct { // NewTCPPeer returns a TCPPeer structure based on the given connection. func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer { return &TCPPeer{ - conn: conn, - server: s, - done: make(chan struct{}), - sendQ: make(chan []byte, requestQueueSize), - hpSendQ: make(chan []byte, hpRequestQueueSize), + conn: conn, + server: s, + done: make(chan struct{}), + sendQ: make(chan []byte, requestQueueSize), + p2pSendQ: make(chan []byte, p2pMsgQueueSize), + hpSendQ: make(chan []byte, hpRequestQueueSize), } } +// putPacketIntoQueue puts given message into the given queue if the peer has +// done handshaking. +func (p *TCPPeer) putPacketIntoQueue(queue chan<- []byte, msg []byte) error { + if !p.Handshaked() { + return errStateMismatch + } + queue <- msg + return nil +} + // EnqueuePacket implements the Peer interface. func (p *TCPPeer) EnqueuePacket(msg []byte) error { - if !p.Handshaked() { - return errStateMismatch + return p.putPacketIntoQueue(p.sendQ, msg) +} + +// putMessageIntoQueue serializes given Message and puts it into given queue if +// the peer has done handshaking. +func (p *TCPPeer) putMsgIntoQueue(queue chan<- []byte, msg *Message) error { + b, err := msg.Bytes() + if err != nil { + return err } - p.sendQ <- msg - return nil + return p.putPacketIntoQueue(queue, b) } // EnqueueMessage is a temporary wrapper that sends a message via // EnqueuePacket if there is no error in serializing it. func (p *TCPPeer) EnqueueMessage(msg *Message) error { - b, err := msg.Bytes() - if err != nil { - return err - } - return p.EnqueuePacket(b) + return p.putMsgIntoQueue(p.sendQ, msg) +} + +// EnqueueP2PPacket implements the Peer interface. +func (p *TCPPeer) EnqueueP2PPacket(msg []byte) error { + return p.putPacketIntoQueue(p.p2pSendQ, msg) +} + +// EnqueueP2PMessage implements the Peer interface. +func (p *TCPPeer) EnqueueP2PMessage(msg *Message) error { + return p.putMsgIntoQueue(p.p2pSendQ, msg) } // EnqueueHPPacket implements the Peer interface. It the peer is not yet // handshaked it's a noop. func (p *TCPPeer) EnqueueHPPacket(msg []byte) error { - if !p.Handshaked() { - return errStateMismatch - } - p.hpSendQ <- msg - return nil + return p.putPacketIntoQueue(p.hpSendQ, msg) } func (p *TCPPeer) writeMsg(msg *Message) error { @@ -131,6 +153,9 @@ func (p *TCPPeer) handleConn() { break } if err = p.server.handleMessage(p, msg); err != nil { + if p.Handshaked() { + err = fmt.Errorf("handling %s message: %v", msg.CommandType(), err) + } break } } @@ -142,6 +167,11 @@ func (p *TCPPeer) handleConn() { // send queues. func (p *TCPPeer) handleQueues() { var err error + // p2psend queue shares its time with send queue in around + // ((p2pSkipDivisor - 1) * 2 + 1)/1 ratio, ratio because the third + // select can still choose p2psend over send. + var p2pSkipCounter uint32 + const p2pSkipDivisor = 4 for { var msg []byte @@ -154,13 +184,25 @@ func (p *TCPPeer) handleQueues() { default: } - // If there is no message in the hp queue, block until one + // Skip this select every p2pSkipDivisor iteration. + if msg == nil && p2pSkipCounter%p2pSkipDivisor != 0 { + // Then look at the p2p queue. + select { + case <-p.done: + return + case msg = <-p.hpSendQ: + case msg = <-p.p2pSendQ: + default: + } + } + // If there is no message in HP or P2P queues, block until one // appears in any of the queues. if msg == nil { select { case <-p.done: return case msg = <-p.hpSendQ: + case msg = <-p.p2pSendQ: case msg = <-p.sendQ: } } @@ -168,6 +210,7 @@ func (p *TCPPeer) handleQueues() { if err != nil { break } + p2pSkipCounter++ } p.Disconnect(err) } @@ -297,7 +340,7 @@ func (p *TCPPeer) RemoteAddr() net.Addr { func (p *TCPPeer) PeerAddr() net.Addr { remote := p.conn.RemoteAddr() // The network can be non-tcp in unit tests. - if !p.Handshaked() || remote.Network() != "tcp" { + if p.version == nil || remote.Network() != "tcp" { return p.RemoteAddr() } host, _, err := net.SplitHostPort(remote.String())