package network import ( "crypto/rand" "encoding/binary" "errors" "fmt" mrand "math/rand" "net" "strconv" "sync" "time" "github.com/nspcc-dev/neo-go/pkg/config/netmode" "github.com/nspcc-dev/neo-go/pkg/consensus" "github.com/nspcc-dev/neo-go/pkg/core" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/mempool" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/extpool" "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/services/notary" "github.com/nspcc-dev/neo-go/pkg/services/oracle" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/atomic" "go.uber.org/zap" ) const ( // peer numbers are arbitrary at the moment. defaultMinPeers = 5 defaultAttemptConnPeers = 20 defaultMaxPeers = 100 maxBlockBatch = 200 minPoolCount = 30 ) var ( errAlreadyConnected = errors.New("already connected") errIdenticalID = errors.New("identical node id") errInvalidHandshake = errors.New("invalid handshake") errInvalidNetwork = errors.New("invalid network") errMaxPeers = errors.New("max peers reached") errServerShutdown = errors.New("server shutdown") errInvalidInvType = errors.New("invalid inventory type") errInvalidHashStart = errors.New("invalid requested HashStart") ) type ( // Server represents the local Node in the network. Its transport could // be of any kind. Server struct { // ServerConfig holds the Server configuration. ServerConfig // id also known as the nonce of the server. id uint32 // Network's magic number for correct message decoding. network netmode.Magic // stateRootInHeader specifies if block header contain state root. stateRootInHeader bool transport Transporter discovery Discoverer chain blockchainer.Blockchainer bQueue *blockQueue consensus consensus.Service notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer notaryModule *notary.Notary lock sync.RWMutex peers map[Peer]bool // lastRequestedHeight contains last requested height. lastRequestedHeight atomic.Uint32 register chan Peer unregister chan peerDrop quit chan struct{} transactions chan *transaction.Transaction consensusStarted *atomic.Bool canHandleExtens *atomic.Bool oracle *oracle.Oracle log *zap.Logger } peerDrop struct { peer Peer reason error } ) func randomID() uint32 { buf := make([]byte, 4) _, _ = rand.Read(buf) return binary.BigEndian.Uint32(buf) } // NewServer returns a new Server, initialized with the given configuration. func NewServer(config ServerConfig, chain blockchainer.Blockchainer, log *zap.Logger) (*Server, error) { return newServerFromConstructors(config, chain, log, func(s *Server) Transporter { return NewTCPTransport(s, net.JoinHostPort(s.ServerConfig.Address, strconv.Itoa(int(s.ServerConfig.Port))), s.log) }, consensus.NewService, newDefaultDiscovery) } func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchainer, log *zap.Logger, newTransport func(*Server) Transporter, newConsensus func(consensus.Config) (consensus.Service, error), newDiscovery func([]string, time.Duration, Transporter) Discoverer, ) (*Server, error) { if log == nil { return nil, errors.New("logger is a required parameter") } s := &Server{ ServerConfig: config, chain: chain, id: randomID(), network: chain.GetConfig().Magic, stateRootInHeader: chain.GetConfig().StateRootInHeader, quit: make(chan struct{}), register: make(chan Peer), unregister: make(chan peerDrop), peers: make(map[Peer]bool), consensusStarted: atomic.NewBool(false), canHandleExtens: atomic.NewBool(false), extensiblePool: extpool.New(chain), log: log, transactions: make(chan *transaction.Transaction, 64), } if chain.P2PSigExtensionsEnabled() { s.notaryFeer = NewNotaryFeer(chain) s.notaryRequestPool = mempool.New(chain.GetConfig().P2PNotaryRequestPayloadPoolSize, 1, chain.GetConfig().P2PNotary.Enabled) chain.RegisterPostBlock(func(bc blockchainer.Blockchainer, txpool *mempool.Pool, _ *block.Block) { s.notaryRequestPool.RemoveStale(func(t *transaction.Transaction) bool { return bc.IsTxStillRelevant(t, txpool, true) }, s.notaryFeer) }) if chain.GetConfig().P2PNotary.Enabled { n, err := notary.NewNotary(chain, s.notaryRequestPool, s.log, func(tx *transaction.Transaction) error { r := s.RelayTxn(tx) if r != RelaySucceed { return fmt.Errorf("can't pool notary tx: hash %s, reason: %d", tx.Hash().StringLE(), byte(r)) } return nil }) if err != nil { return nil, fmt.Errorf("failed to create Notary module: %w", err) } s.notaryModule = n chain.SetNotary(n) chain.RegisterPostBlock(func(bc blockchainer.Blockchainer, pool *mempool.Pool, b *block.Block) { s.notaryModule.PostPersist(bc, pool, b) }) } } else if chain.GetConfig().P2PNotary.Enabled { return nil, errors.New("P2PSigExtensions are disabled, but Notary service is enable") } s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) { if !s.consensusStarted.Load() { s.tryStartConsensus() } }) if config.OracleCfg.Enabled { orcCfg := oracle.Config{ Log: log, Network: config.Net, MainCfg: config.OracleCfg, Chain: chain, } orc, err := oracle.NewOracle(orcCfg) if err != nil { return nil, fmt.Errorf("can't initialize Oracle module: %w", err) } orc.SetOnTransaction(func(tx *transaction.Transaction) { r := s.RelayTxn(tx) if r != RelaySucceed { orc.Log.Error("can't pool oracle tx", zap.String("hash", tx.Hash().StringLE()), zap.Uint8("reason", byte(r))) } }) s.oracle = orc chain.SetOracle(orc) } srv, err := newConsensus(consensus.Config{ Logger: log, Broadcast: s.handleNewPayload, Chain: chain, RequestTx: s.requestTx, Wallet: config.Wallet, TimePerBlock: config.TimePerBlock, }) if err != nil { return nil, err } s.consensus = srv if s.MinPeers < 0 { s.log.Info("bad MinPeers configured, using the default value", zap.Int("configured", s.MinPeers), zap.Int("actual", defaultMinPeers)) s.MinPeers = defaultMinPeers } if s.MaxPeers <= 0 { s.log.Info("bad MaxPeers configured, using the default value", zap.Int("configured", s.MaxPeers), zap.Int("actual", defaultMaxPeers)) s.MaxPeers = defaultMaxPeers } if s.AttemptConnPeers <= 0 { s.log.Info("bad AttemptConnPeers configured, using the default value", zap.Int("configured", s.AttemptConnPeers), zap.Int("actual", defaultAttemptConnPeers)) s.AttemptConnPeers = defaultAttemptConnPeers } s.transport = newTransport(s) s.discovery = newDiscovery( s.Seeds, s.DialTimeout, s.transport, ) return s, nil } // ID returns the servers ID. func (s *Server) ID() uint32 { return s.id } // Start will start the server and its underlying transport. func (s *Server) Start(errChan chan error) { s.log.Info("node started", zap.Uint32("blockHeight", s.chain.BlockHeight()), zap.Uint32("headerHeight", s.chain.HeaderHeight())) s.tryStartConsensus() s.initStaleMemPools() go s.broadcastTxLoop() if s.oracle != nil { go s.oracle.Run() } if s.notaryModule != nil { s.notaryRequestPool.RunSubscriptions() go s.notaryModule.Run() } go s.relayBlocksLoop() go s.bQueue.run() go s.transport.Accept() setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10)) s.run() } // Shutdown disconnects all peers and stops listening. func (s *Server) Shutdown() { s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) s.transport.Close() s.discovery.Close() if s.consensusStarted.Load() { s.consensus.Shutdown() } for p := range s.Peers() { p.Disconnect(errServerShutdown) } s.bQueue.discard() if s.oracle != nil { s.oracle.Shutdown() } if s.notaryModule != nil { s.notaryModule.Stop() s.notaryRequestPool.StopSubscriptions() } close(s.quit) } // GetOracle returns oracle module instance. func (s *Server) GetOracle() *oracle.Oracle { return s.oracle } // UnconnectedPeers returns a list of peers that are in the discovery peer list // but are not connected to the server. func (s *Server) UnconnectedPeers() []string { return s.discovery.UnconnectedPeers() } // BadPeers returns a list of peers the are flagged as "bad" peers. func (s *Server) BadPeers() []string { return s.discovery.BadPeers() } // ConnectedPeers returns a list of currently connected peers. func (s *Server) ConnectedPeers() []string { s.lock.RLock() defer s.lock.RUnlock() peers := make([]string, 0, len(s.peers)) for k := range s.peers { peers = append(peers, k.PeerAddr().String()) } return peers } // run is a goroutine that starts another goroutine to manage protocol specifics // while itself dealing with peers management (handling connects/disconnects). func (s *Server) run() { go s.runProto() for { if s.PeerCount() < s.MinPeers { s.discovery.RequestRemote(s.AttemptConnPeers) } if s.discovery.PoolCount() < minPoolCount { s.broadcastHPMessage(NewMessage(CMDGetAddr, payload.NewNullPayload())) } select { case <-s.quit: return case p := <-s.register: s.lock.Lock() s.peers[p] = true s.lock.Unlock() peerCount := s.PeerCount() s.log.Info("new peer connected", zap.Stringer("addr", p.RemoteAddr()), zap.Int("peerCount", peerCount)) if peerCount > s.MaxPeers { s.lock.RLock() // Pick a random peer and drop connection to it. for peer := range s.peers { // It will send us unregister signal. go peer.Disconnect(errMaxPeers) break } s.lock.RUnlock() } updatePeersConnectedMetric(s.PeerCount()) case drop := <-s.unregister: s.lock.Lock() if s.peers[drop.peer] { delete(s.peers, drop.peer) s.lock.Unlock() s.log.Warn("peer disconnected", zap.Stringer("addr", drop.peer.RemoteAddr()), zap.String("reason", drop.reason.Error()), zap.Int("peerCount", s.PeerCount())) addr := drop.peer.PeerAddr().String() if drop.reason == errIdenticalID { s.discovery.RegisterBadAddr(addr) } else if drop.reason == errAlreadyConnected { // There is a race condition when peer can be disconnected twice for the this reason // which can lead to no connections to peer at all. Here we check for such a possibility. stillConnected := false s.lock.RLock() verDrop := drop.peer.Version() addr := drop.peer.PeerAddr().String() if verDrop != nil { for peer := range s.peers { ver := peer.Version() // Already connected, drop this connection. if ver != nil && ver.Nonce == verDrop.Nonce && peer.PeerAddr().String() == addr { stillConnected = true } } } s.lock.RUnlock() if !stillConnected { s.discovery.UnregisterConnectedAddr(addr) s.discovery.BackFill(addr) } } else { s.discovery.UnregisterConnectedAddr(addr) s.discovery.BackFill(addr) } updatePeersConnectedMetric(s.PeerCount()) } else { // else the peer is already gone, which can happen // because we have two goroutines sending signals here s.lock.Unlock() } } } } // runProto is a goroutine that manages server-wide protocol events. func (s *Server) runProto() { pingTimer := time.NewTimer(s.PingInterval) for { prevHeight := s.chain.BlockHeight() select { case <-s.quit: return case <-pingTimer.C: if s.chain.BlockHeight() == prevHeight { // Get a copy of s.peers to avoid holding a lock while sending. for peer := range s.Peers() { _ = peer.SendPing(NewMessage(CMDPing, payload.NewPing(s.id, s.chain.HeaderHeight()))) } } pingTimer.Reset(s.PingInterval) } } } func (s *Server) tryStartConsensus() { if s.Wallet == nil || s.consensusStarted.Load() { return } if s.IsInSync() { s.log.Info("node reached synchronized state, starting consensus") if s.consensusStarted.CAS(false, true) { s.consensus.Start() } } } // Peers returns the current list of peers connected to // the server. func (s *Server) Peers() map[Peer]bool { s.lock.RLock() defer s.lock.RUnlock() peers := make(map[Peer]bool, len(s.peers)) for k, v := range s.peers { peers[k] = v } return peers } // PeerCount returns the number of current connected peers. func (s *Server) PeerCount() int { s.lock.RLock() defer s.lock.RUnlock() return len(s.peers) } // HandshakedPeersCount returns the number of connected peers // which have already performed handshake. func (s *Server) HandshakedPeersCount() int { s.lock.RLock() defer s.lock.RUnlock() var count int for p := range s.peers { if p.Handshaked() { count++ } } return count } // getVersionMsg returns current version message. func (s *Server) getVersionMsg() (*Message, error) { port, err := s.Port() if err != nil { return nil, err } capabilities := []capability.Capability{ { Type: capability.TCPServer, Data: &capability.Server{ Port: port, }, }, } if s.Relay { capabilities = append(capabilities, capability.Capability{ Type: capability.FullNode, Data: &capability.Node{ StartHeight: s.chain.BlockHeight(), }, }) } payload := payload.NewVersion( s.Net, s.id, s.UserAgent, capabilities, ) return NewMessage(CMDVersion, payload), nil } // IsInSync answers the question of whether the server is in sync with the // network or not (at least how the server itself sees it). The server operates // with the data that it has, the number of peers (that has to be more than // minimum number) and height of these peers (our chain has to be not lower // than 2/3 of our peers have). Ideally we would check for the highest of the // peers, but the problem is that they can lie to us and send whatever height // they want to. func (s *Server) IsInSync() bool { var peersNumber int var notHigher int if s.MinPeers == 0 { return true } ourLastBlock := s.chain.BlockHeight() s.lock.RLock() for p := range s.peers { if p.Handshaked() { peersNumber++ if ourLastBlock >= p.LastBlockIndex() { notHigher++ } } } s.lock.RUnlock() // Checking bQueue would also be nice, but it can be filled with garbage // easily at the moment. return peersNumber >= s.MinPeers && (3*notHigher > 2*peersNumber) // && s.bQueue.length() == 0 } // When a peer sends out his version we reply with verack after validating // the version. func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { err := p.HandleVersion(version) if err != nil { return err } if s.id == version.Nonce { return errIdenticalID } // Make sure both server and peer are operating on // the same network. if s.Net != version.Magic { return errInvalidNetwork } peerAddr := p.PeerAddr().String() s.discovery.RegisterConnectedAddr(peerAddr) s.lock.RLock() for peer := range s.peers { if p == peer { continue } ver := peer.Version() // Already connected, drop this connection. if ver != nil && ver.Nonce == version.Nonce && peer.PeerAddr().String() == peerAddr { s.lock.RUnlock() return errAlreadyConnected } } s.lock.RUnlock() return p.SendVersionAck(NewMessage(CMDVerack, payload.NewNullPayload())) } // handleBlockCmd processes the received block received from its peer. func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { return s.bQueue.putBlock(block) } // handlePing processes ping request. func (s *Server) handlePing(p Peer, ping *payload.Ping) error { err := p.HandlePing(ping) if err != nil { return err } if s.chain.BlockHeight() < ping.LastBlockIndex { err = s.requestBlocks(p) if err != nil { return err } } return p.EnqueueP2PMessage(NewMessage(CMDPong, payload.NewPing(s.chain.BlockHeight(), s.id))) } // handlePing processes pong request. func (s *Server) handlePong(p Peer, pong *payload.Ping) error { err := p.HandlePong(pong) if err != nil { return err } if s.chain.BlockHeight() < pong.LastBlockIndex { return s.requestBlocks(p) } return nil } // handleInvCmd processes the received inventory. func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error { reqHashes := make([]util.Uint256, 0) var typExists = map[payload.InventoryType]func(util.Uint256) bool{ payload.TXType: s.chain.HasTransaction, payload.BlockType: s.chain.HasBlock, payload.ExtensibleType: func(h util.Uint256) bool { cp := s.extensiblePool.Get(h) return cp != nil }, payload.P2PNotaryRequestType: func(h util.Uint256) bool { return s.notaryRequestPool.ContainsKey(h) }, } if exists := typExists[inv.Type]; exists != nil { for _, hash := range inv.Hashes { if !exists(hash) { reqHashes = append(reqHashes, hash) } } } if len(reqHashes) > 0 { msg := NewMessage(CMDGetData, payload.NewInventory(inv.Type, reqHashes)) pkt, err := msg.Bytes() if err != nil { return err } if inv.Type == payload.ExtensibleType { return p.EnqueueHPPacket(true, pkt) } return p.EnqueueP2PPacket(pkt) } return nil } // handleMempoolCmd handles getmempool command. func (s *Server) handleMempoolCmd(p Peer) error { txs := s.chain.GetMemPool().GetVerifiedTransactions() hs := make([]util.Uint256, 0, payload.MaxHashesCount) for i := range txs { hs = append(hs, txs[i].Hash()) if len(hs) < payload.MaxHashesCount && i != len(txs)-1 { continue } msg := NewMessage(CMDInv, payload.NewInventory(payload.TXType, hs)) err := p.EnqueueP2PMessage(msg) if err != nil { return err } hs = hs[:0] } return nil } // handleInvCmd processes the received inventory. func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error { var notFound []util.Uint256 for _, hash := range inv.Hashes { var msg *Message switch inv.Type { case payload.TXType: tx, _, err := s.chain.GetTransaction(hash) if err == nil { msg = NewMessage(CMDTX, tx) } else { notFound = append(notFound, hash) } case payload.BlockType: b, err := s.chain.GetBlock(hash) if err == nil { msg = NewMessage(CMDBlock, b) } else { notFound = append(notFound, hash) } case payload.ExtensibleType: if cp := s.extensiblePool.Get(hash); cp != nil { msg = NewMessage(CMDExtensible, cp) } case payload.P2PNotaryRequestType: if nrp, ok := s.notaryRequestPool.TryGetData(hash); ok { // already have checked P2PSigExtEnabled msg = NewMessage(CMDP2PNotaryRequest, nrp.(*payload.P2PNotaryRequest)) } else { notFound = append(notFound, hash) } } if msg != nil { pkt, err := msg.Bytes() if err == nil { if inv.Type == payload.ExtensibleType { err = p.EnqueueHPPacket(true, pkt) } else { err = p.EnqueueP2PPacket(pkt) } } if err != nil { return err } } } if len(notFound) != 0 { return p.EnqueueP2PMessage(NewMessage(CMDNotFound, payload.NewInventory(inv.Type, notFound))) } return nil } // handleGetBlocksCmd processes the getblocks request. func (s *Server) handleGetBlocksCmd(p Peer, gb *payload.GetBlocks) error { count := gb.Count if gb.Count < 0 || gb.Count > payload.MaxHashesCount { count = payload.MaxHashesCount } start, err := s.chain.GetHeader(gb.HashStart) if err != nil { return err } blockHashes := make([]util.Uint256, 0) for i := start.Index + 1; i <= start.Index+uint32(count); i++ { hash := s.chain.GetHeaderHash(int(i)) if hash.Equals(util.Uint256{}) { break } blockHashes = append(blockHashes, hash) } if len(blockHashes) == 0 { return nil } payload := payload.NewInventory(payload.BlockType, blockHashes) msg := NewMessage(CMDInv, payload) return p.EnqueueP2PMessage(msg) } // handleGetBlockByIndexCmd processes the getblockbyindex request. func (s *Server) handleGetBlockByIndexCmd(p Peer, gbd *payload.GetBlockByIndex) error { count := gbd.Count if gbd.Count < 0 || gbd.Count > payload.MaxHashesCount { count = payload.MaxHashesCount } for i := gbd.IndexStart; i < gbd.IndexStart+uint32(count); i++ { hash := s.chain.GetHeaderHash(int(i)) if hash.Equals(util.Uint256{}) { break } b, err := s.chain.GetBlock(hash) if err != nil { break } msg := NewMessage(CMDBlock, b) if err = p.EnqueueP2PMessage(msg); err != nil { return err } } return nil } // handleGetHeadersCmd processes the getheaders request. func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error { if gh.IndexStart > s.chain.HeaderHeight() { return nil } count := gh.Count if gh.Count < 0 || gh.Count > payload.MaxHeadersAllowed { count = payload.MaxHeadersAllowed } resp := payload.Headers{} resp.Hdrs = make([]*block.Header, 0, count) for i := gh.IndexStart; i < gh.IndexStart+uint32(count); i++ { hash := s.chain.GetHeaderHash(int(i)) if hash.Equals(util.Uint256{}) { break } header, err := s.chain.GetHeader(hash) if err != nil { break } resp.Hdrs = append(resp.Hdrs, header) } if len(resp.Hdrs) == 0 { return nil } msg := NewMessage(CMDHeaders, &resp) return p.EnqueueP2PMessage(msg) } // handleExtensibleCmd processes received extensible payload. func (s *Server) handleExtensibleCmd(e *payload.Extensible) error { if !s.canHandleExtens.Load() { if !s.IsInSync() { return nil } s.canHandleExtens.Store(true) } ok, err := s.extensiblePool.Add(e) if err != nil { return err } if !ok { // payload is already in cache return nil } switch e.Category { case consensus.Category: s.consensus.OnPayload(e) default: return errors.New("invalid category") } msg := NewMessage(CMDInv, payload.NewInventory(payload.ExtensibleType, []util.Uint256{e.Hash()})) if e.Category == consensus.Category { s.broadcastHPMessage(msg) } else { s.broadcastMessage(msg) } return nil } // handleTxCmd processes received transaction. // It never returns an error. func (s *Server) handleTxCmd(tx *transaction.Transaction) error { // It's OK for it to fail for various reasons like tx already existing // in the pool. if s.verifyAndPoolTX(tx) == RelaySucceed { s.consensus.OnTransaction(tx) s.broadcastTX(tx, nil) } return nil } // handleP2PNotaryRequestCmd process received P2PNotaryRequest payload. func (s *Server) handleP2PNotaryRequestCmd(r *payload.P2PNotaryRequest) error { if !s.chain.P2PSigExtensionsEnabled() { return errors.New("P2PNotaryRequestCMD was received, but P2PSignatureExtensions are disabled") } if s.verifyAndPoolNotaryRequest(r) == RelaySucceed { s.broadcastP2PNotaryRequestPayload(nil, r) } return nil } // verifyAndPoolNotaryRequest verifies NotaryRequest payload and adds it to the payload mempool. func (s *Server) verifyAndPoolNotaryRequest(r *payload.P2PNotaryRequest) RelayReason { if err := s.chain.PoolTxWithData(r.FallbackTransaction, r, s.notaryRequestPool, s.notaryFeer, verifyNotaryRequest); err != nil { switch { case errors.Is(err, core.ErrAlreadyExists): return RelayAlreadyExists case errors.Is(err, core.ErrOOM): return RelayOutOfMemory case errors.Is(err, core.ErrPolicy): return RelayPolicyFail default: return RelayInvalid } } return RelaySucceed } // verifyNotaryRequest is a function for state-dependant P2PNotaryRequest payload verification which is executed before ordinary blockchain's verification. func verifyNotaryRequest(bc blockchainer.Blockchainer, _ *transaction.Transaction, data interface{}) error { r := data.(*payload.P2PNotaryRequest) payer := r.FallbackTransaction.Signers[1].Account if err := bc.VerifyWitness(payer, r, &r.Witness, bc.GetPolicer().GetMaxVerificationGAS()); err != nil { return fmt.Errorf("bad P2PNotaryRequest payload witness: %w", err) } notaryHash := bc.GetNotaryContractScriptHash() if r.FallbackTransaction.Sender() != notaryHash { return errors.New("P2PNotary contract should be a sender of the fallback transaction") } depositExpiration := bc.GetNotaryDepositExpiration(payer) if r.FallbackTransaction.ValidUntilBlock >= depositExpiration { return fmt.Errorf("fallback transaction is valid after deposit is unlocked: ValidUntilBlock is %d, deposit lock expires at %d", r.FallbackTransaction.ValidUntilBlock, depositExpiration) } return nil } func (s *Server) broadcastP2PNotaryRequestPayload(_ *transaction.Transaction, data interface{}) { r := data.(payload.P2PNotaryRequest) // we can guarantee that cast is successful msg := NewMessage(CMDInv, payload.NewInventory(payload.P2PNotaryRequestType, []util.Uint256{r.FallbackTransaction.Hash()})) s.broadcastMessage(msg) } // handleAddrCmd will process received addresses. func (s *Server) handleAddrCmd(p Peer, addrs *payload.AddressList) error { if !p.CanProcessAddr() { return errors.New("unexpected addr received") } for _, a := range addrs.Addrs { addr, err := a.GetTCPAddress() if err == nil { s.discovery.BackFill(addr) } } return nil } // handleGetAddrCmd sends to the peer some good addresses that we know of. func (s *Server) handleGetAddrCmd(p Peer) error { addrs := s.discovery.GoodPeers() if len(addrs) > payload.MaxAddrsCount { addrs = addrs[:payload.MaxAddrsCount] } alist := payload.NewAddressList(len(addrs)) ts := time.Now() for i, addr := range addrs { // we know it's a good address, so it can't fail netaddr, _ := net.ResolveTCPAddr("tcp", addr.Address) alist.Addrs[i] = payload.NewAddressAndTime(netaddr, ts, addr.Capabilities) } return p.EnqueueP2PMessage(NewMessage(CMDAddr, alist)) } // requestBlocks sends a CMDGetBlockByIndex message to the peer // to sync up in blocks. A maximum of maxBlockBatch will // send at once. Two things we need to take care of: // 1. If possible, blocks should be fetched in parallel. // height..+500 to one peer, height+500..+1000 to another etc. // 2. Every block must eventually be fetched even if peer sends no answer. // Thus the following algorithm is used: // 1. Block range is divided into chunks of payload.MaxHashesCount. // 2. Send requests for chunk in increasing order. // 3. After all requests were sent, request random height. func (s *Server) requestBlocks(p Peer) error { var currHeight = s.chain.BlockHeight() var peerHeight = p.LastBlockIndex() var needHeight uint32 // lastRequestedHeight can only be increased. for { old := s.lastRequestedHeight.Load() if old <= currHeight { needHeight = currHeight + 1 if !s.lastRequestedHeight.CAS(old, needHeight) { continue } } else if old < currHeight+(blockCacheSize-payload.MaxHashesCount) { needHeight = currHeight + 1 if peerHeight > old+payload.MaxHashesCount { needHeight = old + payload.MaxHashesCount if !s.lastRequestedHeight.CAS(old, needHeight) { continue } } } else { index := mrand.Intn(blockCacheSize / payload.MaxHashesCount) needHeight = currHeight + 1 + uint32(index*payload.MaxHashesCount) } break } payload := payload.NewGetBlockByIndex(needHeight, -1) return p.EnqueueP2PMessage(NewMessage(CMDGetBlockByIndex, payload)) } // handleMessage processes the given message. func (s *Server) handleMessage(peer Peer, msg *Message) error { s.log.Debug("got msg", zap.Stringer("addr", peer.RemoteAddr()), zap.String("type", msg.Command.String())) if peer.Handshaked() { if inv, ok := msg.Payload.(*payload.Inventory); ok { if !inv.Type.Valid(s.chain.P2PSigExtensionsEnabled()) || len(inv.Hashes) == 0 { return errInvalidInvType } } switch msg.Command { case CMDAddr: addrs := msg.Payload.(*payload.AddressList) return s.handleAddrCmd(peer, addrs) case CMDGetAddr: // it has no payload return s.handleGetAddrCmd(peer) case CMDGetBlocks: gb := msg.Payload.(*payload.GetBlocks) return s.handleGetBlocksCmd(peer, gb) case CMDGetBlockByIndex: gbd := msg.Payload.(*payload.GetBlockByIndex) return s.handleGetBlockByIndexCmd(peer, gbd) case CMDGetData: inv := msg.Payload.(*payload.Inventory) return s.handleGetDataCmd(peer, inv) case CMDGetHeaders: gh := msg.Payload.(*payload.GetBlockByIndex) return s.handleGetHeadersCmd(peer, gh) case CMDInv: inventory := msg.Payload.(*payload.Inventory) return s.handleInvCmd(peer, inventory) case CMDMempool: // no payload return s.handleMempoolCmd(peer) case CMDBlock: block := msg.Payload.(*block.Block) return s.handleBlockCmd(peer, block) case CMDExtensible: cp := msg.Payload.(*payload.Extensible) return s.handleExtensibleCmd(cp) case CMDTX: tx := msg.Payload.(*transaction.Transaction) return s.handleTxCmd(tx) case CMDP2PNotaryRequest: r := msg.Payload.(*payload.P2PNotaryRequest) return s.handleP2PNotaryRequestCmd(r) case CMDPing: ping := msg.Payload.(*payload.Ping) return s.handlePing(peer, ping) case CMDPong: pong := msg.Payload.(*payload.Ping) return s.handlePong(peer, pong) case CMDVersion, CMDVerack: return fmt.Errorf("received '%s' after the handshake", msg.Command.String()) } } else { switch msg.Command { case CMDVersion: version := msg.Payload.(*payload.Version) return s.handleVersionCmd(peer, version) case CMDVerack: err := peer.HandleVersionAck() if err != nil { return err } go peer.StartProtocol() s.tryStartConsensus() default: return fmt.Errorf("received '%s' during handshake", msg.Command.String()) } } return nil } func (s *Server) handleNewPayload(p *payload.Extensible) { _, err := s.extensiblePool.Add(p) if err != nil { s.log.Error("created payload is not valid", zap.Error(err)) return } msg := NewMessage(CMDInv, payload.NewInventory(payload.ExtensibleType, []util.Uint256{p.Hash()})) // It's high priority because it directly affects consensus process, // even though it's just an inv. s.broadcastHPMessage(msg) } func (s *Server) requestTx(hashes ...util.Uint256) { if len(hashes) == 0 { return } for i := 0; i <= len(hashes)/payload.MaxHashesCount; i++ { start := i * payload.MaxHashesCount stop := (i + 1) * payload.MaxHashesCount if stop > len(hashes) { stop = len(hashes) } if start == stop { break } msg := NewMessage(CMDGetData, payload.NewInventory(payload.TXType, hashes[start:stop])) // It's high priority because it directly affects consensus process, // even though it's getdata. s.broadcastHPMessage(msg) } } // iteratePeersWithSendMsg sends given message to all peers using two functions // passed, one is to send the message and the other is to filtrate peers (the // peer is considered invalid if it returns false). func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []byte) error, peerOK func(Peer) bool) { // Get a copy of s.peers to avoid holding a lock while sending. peers := s.Peers() if len(peers) == 0 { return } pkt, err := msg.Bytes() if err != nil { return } success := make(map[Peer]bool, len(peers)) okCount := 0 sentCount := 0 for peer := range peers { if peerOK != nil && !peerOK(peer) { success[peer] = false continue } okCount++ if err := send(peer, false, pkt); err != nil { continue } if msg.Command == CMDGetAddr { peer.AddGetAddrSent() } success[peer] = true sentCount++ } // Send to at least 2/3 of good peers. if 3*sentCount >= 2*okCount { return } // Perform blocking send now. for peer := range peers { if _, ok := success[peer]; ok || peerOK != nil && !peerOK(peer) { continue } if err := send(peer, true, pkt); err != nil { continue } if msg.Command == CMDGetAddr { peer.AddGetAddrSent() } sentCount++ if 3*sentCount >= 2*okCount { return } } } // broadcastMessage sends the message to all available peers. func (s *Server) broadcastMessage(msg *Message) { s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, nil) } // broadcastHPMessage sends the high-priority message to all available peers. func (s *Server) broadcastHPMessage(msg *Message) { s.iteratePeersWithSendMsg(msg, Peer.EnqueueHPPacket, nil) } // relayBlocksLoop subscribes to new blocks in the ledger and broadcasts them // to the network. Intended to be run as a separate goroutine. func (s *Server) relayBlocksLoop() { ch := make(chan *block.Block, 2) // Some buffering to smooth out possible egressing delays. s.chain.SubscribeForBlocks(ch) for { select { case <-s.quit: s.chain.UnsubscribeFromBlocks(ch) return case b := <-ch: msg := NewMessage(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()})) // Filter out nodes that are more current (avoid spamming the network // during initial sync). s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool { return p.Handshaked() && p.LastBlockIndex() < b.Index }) s.extensiblePool.RemoveStale(b.Index) } } } // verifyAndPoolTX verifies the TX and adds it to the local mempool. func (s *Server) verifyAndPoolTX(t *transaction.Transaction) RelayReason { if err := s.chain.PoolTx(t); err != nil { switch { case errors.Is(err, core.ErrAlreadyExists): return RelayAlreadyExists case errors.Is(err, core.ErrOOM): return RelayOutOfMemory case errors.Is(err, core.ErrPolicy): return RelayPolicyFail default: return RelayInvalid } } return RelaySucceed } // RelayTxn a new transaction to the local node and the connected peers. // Reference: the method OnRelay in C#: https://github.com/neo-project/neo/blob/master/neo/Network/P2P/LocalNode.cs#L159 func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason { ret := s.verifyAndPoolTX(t) if ret == RelaySucceed { s.broadcastTX(t, nil) } return ret } // broadcastTX broadcasts an inventory message about new transaction. func (s *Server) broadcastTX(t *transaction.Transaction, _ interface{}) { select { case s.transactions <- t: case <-s.quit: } } func (s *Server) broadcastTxHashes(hs []util.Uint256) { msg := NewMessage(CMDInv, payload.NewInventory(payload.TXType, hs)) // We need to filter out non-relaying nodes, so plain broadcast // functions don't fit here. s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, Peer.IsFullNode) } // initStaleMemPools initializes mempools for stale tx/payload processing. func (s *Server) initStaleMemPools() { cfg := s.chain.GetConfig() threshold := 5 if cfg.ValidatorsCount*2 > threshold { threshold = cfg.ValidatorsCount * 2 } mp := s.chain.GetMemPool() mp.SetResendThreshold(uint32(threshold), s.broadcastTX) if s.chain.P2PSigExtensionsEnabled() { s.notaryRequestPool.SetResendThreshold(uint32(threshold), s.broadcastP2PNotaryRequestPayload) } } // broadcastTxLoop is a loop for batching and sending // transactions hashes in an INV payload. func (s *Server) broadcastTxLoop() { const ( batchTime = time.Millisecond * 50 batchSize = 32 ) txs := make([]util.Uint256, 0, batchSize) var timer *time.Timer timerCh := func() <-chan time.Time { if timer == nil { return nil } return timer.C } broadcast := func() { s.broadcastTxHashes(txs) txs = txs[:0] if timer != nil { timer.Stop() } } for { select { case <-s.quit: loop: for { select { case <-s.transactions: default: break loop } } return case <-timerCh(): if len(txs) > 0 { broadcast() } case tx := <-s.transactions: if len(txs) == 0 { timer = time.NewTimer(batchTime) } txs = append(txs, tx.Hash()) if len(txs) == batchSize { broadcast() } } } } // Port returns actual server port. It may differs from that of server.Config. func (s *Server) Port() (uint16, error) { var port uint16 _, portStr, err := net.SplitHostPort(s.transport.Address()) if err != nil { port = s.ServerConfig.Port } else { p, err := strconv.ParseUint(portStr, 10, 16) if err != nil { return 0, err } port = uint16(p) } return port, nil }