2018-01-26 18:04:13 +00:00
package network
import (
2019-11-15 17:42:23 +00:00
"crypto/rand"
"encoding/binary"
2018-03-14 09:36:59 +00:00
"errors"
2018-01-26 18:04:13 +00:00
"fmt"
2020-11-26 15:53:10 +00:00
mrand "math/rand"
2019-09-13 17:38:34 +00:00
"net"
2019-10-29 17:51:17 +00:00
"strconv"
2018-03-14 09:36:59 +00:00
"sync"
2018-01-28 10:12:05 +00:00
"time"
2018-01-27 15:00:28 +00:00
2020-06-18 09:00:51 +00:00
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
2020-03-03 14:21:42 +00:00
"github.com/nspcc-dev/neo-go/pkg/consensus"
"github.com/nspcc-dev/neo-go/pkg/core/block"
2020-04-08 10:56:04 +00:00
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
2020-11-27 10:55:48 +00:00
"github.com/nspcc-dev/neo-go/pkg/core/mempool"
2021-05-28 11:55:06 +00:00
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
2020-03-03 14:21:42 +00:00
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
2020-05-22 09:17:17 +00:00
"github.com/nspcc-dev/neo-go/pkg/network/capability"
2021-01-18 12:52:51 +00:00
"github.com/nspcc-dev/neo-go/pkg/network/extpool"
2020-03-03 14:21:42 +00:00
"github.com/nspcc-dev/neo-go/pkg/network/payload"
2020-12-30 08:01:13 +00:00
"github.com/nspcc-dev/neo-go/pkg/services/notary"
2020-09-28 11:58:04 +00:00
"github.com/nspcc-dev/neo-go/pkg/services/oracle"
2021-02-01 16:00:07 +00:00
"github.com/nspcc-dev/neo-go/pkg/services/stateroot"
2020-03-03 14:21:42 +00:00
"github.com/nspcc-dev/neo-go/pkg/util"
2019-11-15 10:32:40 +00:00
"go.uber.org/atomic"
2019-12-30 07:43:05 +00:00
"go.uber.org/zap"
2018-01-26 18:04:13 +00:00
)
const (
2019-10-22 14:56:03 +00:00
// peer numbers are arbitrary at the moment.
2021-05-04 14:54:16 +00:00
defaultMinPeers = 5
defaultAttemptConnPeers = 20
defaultMaxPeers = 100
defaultExtensiblePoolSize = 20
maxBlockBatch = 200
minPoolCount = 30
2018-01-26 18:04:13 +00:00
)
2018-03-14 09:36:59 +00:00
var (
2019-11-06 09:39:17 +00:00
errAlreadyConnected = errors . New ( "already connected" )
2018-03-14 09:36:59 +00:00
errIdenticalID = errors . New ( "identical node id" )
errInvalidNetwork = errors . New ( "invalid network" )
2019-11-06 12:17:20 +00:00
errMaxPeers = errors . New ( "max peers reached" )
2018-03-14 09:36:59 +00:00
errServerShutdown = errors . New ( "server shutdown" )
errInvalidInvType = errors . New ( "invalid inventory type" )
)
2018-02-01 20:28:45 +00:00
2018-03-14 09:36:59 +00:00
type (
// Server represents the local Node in the network. Its transport could
// be of any kind.
Server struct {
2018-03-15 20:45:37 +00:00
// ServerConfig holds the Server configuration.
ServerConfig
2018-02-06 06:43:32 +00:00
2018-03-23 20:36:59 +00:00
// id also known as the nonce of the server.
2018-03-14 09:36:59 +00:00
id uint32
2018-01-26 18:04:13 +00:00
2020-06-18 09:00:51 +00:00
// Network's magic number for correct message decoding.
network netmode . Magic
2020-11-17 12:57:50 +00:00
// stateRootInHeader specifies if block header contain state root.
stateRootInHeader bool
2020-06-18 09:00:51 +00:00
2020-11-27 10:55:48 +00:00
transport Transporter
discovery Discoverer
chain blockchainer . Blockchainer
bQueue * blockQueue
consensus consensus . Service
notaryRequestPool * mempool . Pool
2021-01-18 12:52:51 +00:00
extensiblePool * extpool . Pool
2020-12-30 08:01:13 +00:00
notaryFeer NotaryFeer
notaryModule * notary . Notary
2018-01-26 18:04:13 +00:00
2018-03-14 09:36:59 +00:00
lock sync . RWMutex
peers map [ Peer ] bool
2018-01-26 20:39:34 +00:00
2020-11-26 15:53:10 +00:00
// lastRequestedHeight contains last requested height.
lastRequestedHeight atomic . Uint32
2018-03-14 09:36:59 +00:00
register chan Peer
unregister chan peerDrop
quit chan struct { }
2019-11-15 10:32:40 +00:00
2020-02-18 15:11:55 +00:00
transactions chan * transaction . Transaction
2021-04-02 09:55:56 +00:00
syncReached * atomic . Bool
2019-12-30 07:43:05 +00:00
2021-02-01 16:00:07 +00:00
oracle * oracle . Oracle
stateRoot stateroot . Service
2020-09-28 11:58:04 +00:00
2019-12-30 07:43:05 +00:00
log * zap . Logger
2018-03-14 09:36:59 +00:00
}
2018-03-10 12:04:06 +00:00
2018-03-14 09:36:59 +00:00
peerDrop struct {
peer Peer
reason error
2018-02-01 08:00:42 +00:00
}
2018-03-14 09:36:59 +00:00
)
2019-11-15 17:42:23 +00:00
func randomID ( ) uint32 {
buf := make ( [ ] byte , 4 )
_ , _ = rand . Read ( buf )
return binary . BigEndian . Uint32 ( buf )
}
2018-03-14 09:36:59 +00:00
// NewServer returns a new Server, initialized with the given configuration.
2020-04-08 10:56:04 +00:00
func NewServer ( config ServerConfig , chain blockchainer . Blockchainer , log * zap . Logger ) ( * Server , error ) {
2020-12-07 09:52:19 +00:00
return newServerFromConstructors ( config , chain , log , func ( s * Server ) Transporter {
return NewTCPTransport ( s , net . JoinHostPort ( s . ServerConfig . Address , strconv . Itoa ( int ( s . ServerConfig . Port ) ) ) , s . log )
} , consensus . NewService , newDefaultDiscovery )
}
func newServerFromConstructors ( config ServerConfig , chain blockchainer . Blockchainer , log * zap . Logger ,
newTransport func ( * Server ) Transporter ,
newConsensus func ( consensus . Config ) ( consensus . Service , error ) ,
newDiscovery func ( [ ] string , time . Duration , Transporter ) Discoverer ,
) ( * Server , error ) {
2019-12-30 07:43:05 +00:00
if log == nil {
2020-01-22 08:17:51 +00:00
return nil , errors . New ( "logger is a required parameter" )
2019-12-30 07:43:05 +00:00
}
2021-05-04 14:54:16 +00:00
if config . ExtensiblePoolSize <= 0 {
config . ExtensiblePoolSize = defaultExtensiblePoolSize
log . Info ( "ExtensiblePoolSize is not set or wrong, using default value" ,
zap . Int ( "ExtensiblePoolSize" , config . ExtensiblePoolSize ) )
}
2018-03-09 15:55:25 +00:00
s := & Server {
2020-11-17 12:57:50 +00:00
ServerConfig : config ,
chain : chain ,
id : randomID ( ) ,
network : chain . GetConfig ( ) . Magic ,
stateRootInHeader : chain . GetConfig ( ) . StateRootInHeader ,
quit : make ( chan struct { } ) ,
register : make ( chan Peer ) ,
unregister : make ( chan peerDrop ) ,
peers : make ( map [ Peer ] bool ) ,
2021-04-02 09:55:56 +00:00
syncReached : atomic . NewBool ( false ) ,
2021-05-04 14:54:16 +00:00
extensiblePool : extpool . New ( chain , config . ExtensiblePoolSize ) ,
2020-11-17 12:57:50 +00:00
log : log ,
transactions : make ( chan * transaction . Transaction , 64 ) ,
2018-01-26 18:04:13 +00:00
}
2020-11-27 10:55:48 +00:00
if chain . P2PSigExtensionsEnabled ( ) {
2020-12-30 08:01:13 +00:00
s . notaryFeer = NewNotaryFeer ( chain )
2021-05-28 11:55:06 +00:00
s . notaryRequestPool = mempool . New ( chain . GetConfig ( ) . P2PNotaryRequestPayloadPoolSize , 1 , true )
2020-11-27 10:55:48 +00:00
chain . RegisterPostBlock ( func ( bc blockchainer . Blockchainer , txpool * mempool . Pool , _ * block . Block ) {
s . notaryRequestPool . RemoveStale ( func ( t * transaction . Transaction ) bool {
return bc . IsTxStillRelevant ( t , txpool , true )
2020-12-30 08:01:13 +00:00
} , s . notaryFeer )
2020-11-27 10:55:48 +00:00
} )
2021-02-16 10:49:56 +00:00
if config . P2PNotaryCfg . Enabled {
cfg := notary . Config {
MainCfg : config . P2PNotaryCfg ,
Chain : chain ,
Log : log ,
}
2021-03-25 16:18:01 +00:00
n , err := notary . NewNotary ( cfg , s . network , s . notaryRequestPool , func ( tx * transaction . Transaction ) error {
2021-02-17 11:51:54 +00:00
if err := s . RelayTxn ( tx ) ; err != nil {
return fmt . Errorf ( "can't relay completed notary transaction: hash %s, error: %w" , tx . Hash ( ) . StringLE ( ) , err )
2020-12-30 08:01:13 +00:00
}
return nil
} )
if err != nil {
return nil , fmt . Errorf ( "failed to create Notary module: %w" , err )
}
s . notaryModule = n
chain . SetNotary ( n )
}
2021-02-16 10:49:56 +00:00
} else if config . P2PNotaryCfg . Enabled {
2021-06-15 09:22:01 +00:00
return nil , errors . New ( "P2PSigExtensions are disabled, but Notary service is enabled" )
2020-11-27 10:55:48 +00:00
}
2020-02-14 17:46:05 +00:00
s . bQueue = newBlockQueue ( maxBlockBatch , chain , log , func ( b * block . Block ) {
2021-06-04 17:29:47 +00:00
s . tryStartServices ( )
2020-02-14 17:46:05 +00:00
} )
2018-01-26 18:04:13 +00:00
2021-02-02 09:34:27 +00:00
if config . StateRootCfg . Enabled && chain . GetConfig ( ) . StateRootInHeader {
return nil , errors . New ( "`StateRootInHeader` should be disabled when state service is enabled" )
}
2021-04-02 09:12:36 +00:00
sr , err := stateroot . New ( config . StateRootCfg , s . log , chain , s . handleNewPayload )
2021-02-01 16:00:07 +00:00
if err != nil {
return nil , fmt . Errorf ( "can't initialize StateRoot service: %w" , err )
}
s . stateRoot = sr
2020-09-28 11:58:04 +00:00
if config . OracleCfg . Enabled {
orcCfg := oracle . Config {
Log : log ,
Network : config . Net ,
MainCfg : config . OracleCfg ,
Chain : chain ,
}
orc , err := oracle . NewOracle ( orcCfg )
if err != nil {
return nil , fmt . Errorf ( "can't initialize Oracle module: %w" , err )
}
orc . SetOnTransaction ( func ( tx * transaction . Transaction ) {
2021-02-17 11:51:54 +00:00
if err := s . RelayTxn ( tx ) ; err != nil {
2020-09-28 11:58:04 +00:00
orc . Log . Error ( "can't pool oracle tx" ,
zap . String ( "hash" , tx . Hash ( ) . StringLE ( ) ) ,
2021-02-17 11:51:54 +00:00
zap . Error ( err ) )
2020-09-28 11:58:04 +00:00
}
} )
s . oracle = orc
chain . SetOracle ( orc )
}
2020-12-07 09:52:19 +00:00
srv , err := newConsensus ( consensus . Config {
2021-03-15 09:25:52 +00:00
Logger : log ,
Broadcast : s . handleNewPayload ,
Chain : chain ,
ProtocolConfiguration : chain . GetConfig ( ) ,
RequestTx : s . requestTx ,
Wallet : config . Wallet ,
2020-01-13 14:57:40 +00:00
TimePerBlock : config . TimePerBlock ,
2019-11-15 10:32:40 +00:00
} )
if err != nil {
2020-01-22 08:17:51 +00:00
return nil , err
2019-11-15 10:32:40 +00:00
}
s . consensus = srv
2020-01-13 12:22:21 +00:00
if s . MinPeers < 0 {
2019-12-30 07:43:05 +00:00
s . log . Info ( "bad MinPeers configured, using the default value" ,
zap . Int ( "configured" , s . MinPeers ) ,
zap . Int ( "actual" , defaultMinPeers ) )
2019-11-01 10:29:54 +00:00
s . MinPeers = defaultMinPeers
}
2019-11-06 12:17:20 +00:00
if s . MaxPeers <= 0 {
2019-12-30 07:43:05 +00:00
s . log . Info ( "bad MaxPeers configured, using the default value" ,
zap . Int ( "configured" , s . MaxPeers ) ,
zap . Int ( "actual" , defaultMaxPeers ) )
2019-11-06 12:17:20 +00:00
s . MaxPeers = defaultMaxPeers
}
if s . AttemptConnPeers <= 0 {
2019-12-30 07:43:05 +00:00
s . log . Info ( "bad AttemptConnPeers configured, using the default value" ,
zap . Int ( "configured" , s . AttemptConnPeers ) ,
zap . Int ( "actual" , defaultAttemptConnPeers ) )
2019-11-06 12:17:20 +00:00
s . AttemptConnPeers = defaultAttemptConnPeers
}
2020-12-07 09:52:19 +00:00
s . transport = newTransport ( s )
s . discovery = newDiscovery (
2020-10-13 13:30:10 +00:00
s . Seeds ,
2018-03-14 09:36:59 +00:00
s . DialTimeout ,
s . transport ,
)
2018-01-26 18:04:13 +00:00
2020-01-22 08:17:51 +00:00
return s , nil
2018-01-30 10:56:36 +00:00
}
2018-03-23 20:36:59 +00:00
// ID returns the servers ID.
func ( s * Server ) ID ( ) uint32 {
return s . id
}
2018-03-14 09:36:59 +00:00
// Start will start the server and its underlying transport.
2018-03-23 20:36:59 +00:00
func ( s * Server ) Start ( errChan chan error ) {
2019-12-30 07:43:05 +00:00
s . log . Info ( "node started" ,
zap . Uint32 ( "blockHeight" , s . chain . BlockHeight ( ) ) ,
zap . Uint32 ( "headerHeight" , s . chain . HeaderHeight ( ) ) )
2018-03-17 11:53:21 +00:00
2021-04-02 09:55:56 +00:00
s . tryStartServices ( )
2020-11-27 10:55:48 +00:00
s . initStaleMemPools ( )
2020-01-13 12:22:21 +00:00
2020-02-18 15:11:55 +00:00
go s . broadcastTxLoop ( )
2020-05-07 20:00:38 +00:00
go s . relayBlocksLoop ( )
2019-09-25 16:54:31 +00:00
go s . bQueue . run ( )
2018-03-14 09:36:59 +00:00
go s . transport . Accept ( )
2019-10-29 17:51:17 +00:00
setServerAndNodeVersions ( s . UserAgent , strconv . FormatUint ( uint64 ( s . id ) , 10 ) )
2018-03-14 09:36:59 +00:00
s . run ( )
2018-01-31 19:11:08 +00:00
}
2018-01-30 10:56:36 +00:00
2018-03-23 20:36:59 +00:00
// Shutdown disconnects all peers and stops listening.
func ( s * Server ) Shutdown ( ) {
2019-12-30 07:43:05 +00:00
s . log . Info ( "shutting down server" , zap . Int ( "peers" , s . PeerCount ( ) ) )
2020-02-24 12:54:18 +00:00
s . transport . Close ( )
2020-02-24 12:39:31 +00:00
s . discovery . Close ( )
2021-04-02 09:55:56 +00:00
s . consensus . Shutdown ( )
2020-07-16 08:43:34 +00:00
for p := range s . Peers ( ) {
2020-02-24 12:54:18 +00:00
p . Disconnect ( errServerShutdown )
}
2019-09-25 16:54:31 +00:00
s . bQueue . discard ( )
2021-03-03 09:37:06 +00:00
if s . StateRootCfg . Enabled {
s . stateRoot . Shutdown ( )
}
2020-09-28 11:58:04 +00:00
if s . oracle != nil {
s . oracle . Shutdown ( )
}
2021-01-15 12:40:15 +00:00
if s . notaryModule != nil {
s . notaryModule . Stop ( )
2021-05-28 11:55:06 +00:00
}
if s . chain . P2PSigExtensionsEnabled ( ) {
2021-01-15 12:40:15 +00:00
s . notaryRequestPool . StopSubscriptions ( )
}
2018-03-23 20:36:59 +00:00
close ( s . quit )
}
2020-09-28 11:58:04 +00:00
// GetOracle returns oracle module instance.
func ( s * Server ) GetOracle ( ) * oracle . Oracle {
return s . oracle
}
2021-02-01 16:00:07 +00:00
// GetStateRoot returns state root service instance.
func ( s * Server ) GetStateRoot ( ) stateroot . Service {
return s . stateRoot
}
2018-04-09 16:58:09 +00:00
// UnconnectedPeers returns a list of peers that are in the discovery peer list
// but are not connected to the server.
2018-03-23 20:36:59 +00:00
func ( s * Server ) UnconnectedPeers ( ) [ ] string {
2020-01-10 12:16:14 +00:00
return s . discovery . UnconnectedPeers ( )
2018-03-23 20:36:59 +00:00
}
2018-04-09 16:58:09 +00:00
// BadPeers returns a list of peers the are flagged as "bad" peers.
2018-03-23 20:36:59 +00:00
func ( s * Server ) BadPeers ( ) [ ] string {
2020-01-10 12:16:14 +00:00
return s . discovery . BadPeers ( )
2018-03-23 20:36:59 +00:00
}
2020-01-10 12:13:29 +00:00
// ConnectedPeers returns a list of currently connected peers.
func ( s * Server ) ConnectedPeers ( ) [ ] string {
s . lock . RLock ( )
defer s . lock . RUnlock ( )
peers := make ( [ ] string , 0 , len ( s . peers ) )
for k := range s . peers {
peers = append ( peers , k . PeerAddr ( ) . String ( ) )
}
return peers
}
2020-01-27 09:44:05 +00:00
// run is a goroutine that starts another goroutine to manage protocol specifics
// while itself dealing with peers management (handling connects/disconnects).
2018-03-14 09:36:59 +00:00
func ( s * Server ) run ( ) {
2020-01-27 09:44:05 +00:00
go s . runProto ( )
2018-03-09 15:55:25 +00:00
for {
2019-11-06 12:17:20 +00:00
if s . PeerCount ( ) < s . MinPeers {
s . discovery . RequestRemote ( s . AttemptConnPeers )
2019-09-12 13:19:18 +00:00
}
2019-09-13 09:03:07 +00:00
if s . discovery . PoolCount ( ) < minPoolCount {
2020-05-21 10:35:44 +00:00
s . broadcastHPMessage ( NewMessage ( CMDGetAddr , payload . NewNullPayload ( ) ) )
2019-09-13 09:03:07 +00:00
}
2018-03-14 09:36:59 +00:00
select {
case <- s . quit :
return
case p := <- s . register :
2019-11-06 09:38:47 +00:00
s . lock . Lock ( )
2018-03-14 09:36:59 +00:00
s . peers [ p ] = true
2019-11-06 09:38:47 +00:00
s . lock . Unlock ( )
2019-11-06 12:17:20 +00:00
peerCount := s . PeerCount ( )
2020-01-28 16:39:12 +00:00
s . log . Info ( "new peer connected" , zap . Stringer ( "addr" , p . RemoteAddr ( ) ) , zap . Int ( "peerCount" , peerCount ) )
2019-11-06 12:17:20 +00:00
if peerCount > s . MaxPeers {
s . lock . RLock ( )
// Pick a random peer and drop connection to it.
for peer := range s . peers {
2020-02-24 09:39:46 +00:00
// It will send us unregister signal.
go peer . Disconnect ( errMaxPeers )
2019-11-06 12:17:20 +00:00
break
}
s . lock . RUnlock ( )
}
2019-10-29 17:51:17 +00:00
updatePeersConnectedMetric ( s . PeerCount ( ) )
2018-03-14 09:36:59 +00:00
case drop := <- s . unregister :
2019-11-06 09:38:47 +00:00
s . lock . Lock ( )
2019-09-13 12:36:53 +00:00
if s . peers [ drop . peer ] {
delete ( s . peers , drop . peer )
2019-11-06 09:38:47 +00:00
s . lock . Unlock ( )
2019-12-30 07:43:05 +00:00
s . log . Warn ( "peer disconnected" ,
zap . Stringer ( "addr" , drop . peer . RemoteAddr ( ) ) ,
zap . String ( "reason" , drop . reason . Error ( ) ) ,
zap . Int ( "peerCount" , s . PeerCount ( ) ) )
2019-11-06 07:55:21 +00:00
addr := drop . peer . PeerAddr ( ) . String ( )
2019-11-27 08:56:56 +00:00
if drop . reason == errIdenticalID {
s . discovery . RegisterBadAddr ( addr )
2020-12-23 13:13:57 +00:00
} else if drop . reason == errAlreadyConnected {
// There is a race condition when peer can be disconnected twice for the this reason
// which can lead to no connections to peer at all. Here we check for such a possibility.
stillConnected := false
s . lock . RLock ( )
verDrop := drop . peer . Version ( )
addr := drop . peer . PeerAddr ( ) . String ( )
if verDrop != nil {
for peer := range s . peers {
ver := peer . Version ( )
// Already connected, drop this connection.
if ver != nil && ver . Nonce == verDrop . Nonce && peer . PeerAddr ( ) . String ( ) == addr {
stillConnected = true
}
}
}
s . lock . RUnlock ( )
if ! stillConnected {
s . discovery . UnregisterConnectedAddr ( addr )
s . discovery . BackFill ( addr )
}
} else {
2019-11-27 08:56:56 +00:00
s . discovery . UnregisterConnectedAddr ( addr )
s . discovery . BackFill ( addr )
}
2019-10-29 17:51:17 +00:00
updatePeersConnectedMetric ( s . PeerCount ( ) )
2019-11-06 09:38:47 +00:00
} else {
// else the peer is already gone, which can happen
// because we have two goroutines sending signals here
s . lock . Unlock ( )
2019-09-13 12:36:53 +00:00
}
2018-03-09 15:55:25 +00:00
}
2018-01-31 19:11:08 +00:00
}
2018-01-27 12:39:07 +00:00
}
2020-01-27 09:44:05 +00:00
// runProto is a goroutine that manages server-wide protocol events.
func ( s * Server ) runProto ( ) {
pingTimer := time . NewTimer ( s . PingInterval )
for {
prevHeight := s . chain . BlockHeight ( )
select {
case <- s . quit :
return
case <- pingTimer . C :
if s . chain . BlockHeight ( ) == prevHeight {
// Get a copy of s.peers to avoid holding a lock while sending.
for peer := range s . Peers ( ) {
2021-08-06 08:26:19 +00:00
_ = peer . SendPing ( NewMessage ( CMDPing , payload . NewPing ( s . chain . BlockHeight ( ) , s . id ) ) )
2020-01-27 09:44:05 +00:00
}
}
pingTimer . Reset ( s . PingInterval )
}
}
}
2021-04-02 09:55:56 +00:00
func ( s * Server ) tryStartServices ( ) {
if s . syncReached . Load ( ) {
2019-11-15 10:32:40 +00:00
return
}
2021-04-02 09:55:56 +00:00
if s . IsInSync ( ) && s . syncReached . CAS ( false , true ) {
s . log . Info ( "node reached synchronized state, starting services" )
if s . Wallet != nil {
2019-11-15 10:32:40 +00:00
s . consensus . Start ( )
}
2021-04-02 10:12:06 +00:00
if s . StateRootCfg . Enabled {
s . stateRoot . Run ( )
}
if s . oracle != nil {
go s . oracle . Run ( )
}
2021-05-28 11:55:06 +00:00
if s . chain . P2PSigExtensionsEnabled ( ) {
s . notaryRequestPool . RunSubscriptions ( ) // WSClient is also a subscriber.
}
2021-04-02 10:12:06 +00:00
if s . notaryModule != nil {
go s . notaryModule . Run ( )
}
2019-11-15 10:32:40 +00:00
}
}
2021-05-28 11:55:06 +00:00
// SubscribeForNotaryRequests adds given channel to a notary request event
// broadcasting, so when a new P2PNotaryRequest is received or an existing
// P2PNotaryRequest is removed from pool you'll receive it via this channel.
// Make sure it's read from regularly as not reading these events might affect
// other Server functions.
// Ensure that P2PSigExtensions are enabled before calling this method.
func ( s * Server ) SubscribeForNotaryRequests ( ch chan <- mempoolevent . Event ) {
if ! s . chain . P2PSigExtensionsEnabled ( ) {
panic ( "P2PSigExtensions are disabled" )
}
s . notaryRequestPool . SubscribeForTransactions ( ch )
}
// UnsubscribeFromNotaryRequests unsubscribes given channel from notary request
// notifications, you can close it afterwards. Passing non-subscribed channel
// is a no-op.
// Ensure that P2PSigExtensions are enabled before calling this method.
func ( s * Server ) UnsubscribeFromNotaryRequests ( ch chan <- mempoolevent . Event ) {
if ! s . chain . P2PSigExtensionsEnabled ( ) {
panic ( "P2PSigExtensions are disabled" )
}
s . notaryRequestPool . UnsubscribeFromTransactions ( ch )
}
2018-03-23 20:36:59 +00:00
// Peers returns the current list of peers connected to
// the server.
func ( s * Server ) Peers ( ) map [ Peer ] bool {
2019-11-15 10:32:40 +00:00
s . lock . RLock ( )
defer s . lock . RUnlock ( )
peers := make ( map [ Peer ] bool , len ( s . peers ) )
for k , v := range s . peers {
peers [ k ] = v
}
return peers
2018-03-23 20:36:59 +00:00
}
2018-03-14 09:36:59 +00:00
// PeerCount returns the number of current connected peers.
func ( s * Server ) PeerCount ( ) int {
s . lock . RLock ( )
defer s . lock . RUnlock ( )
return len ( s . peers )
2018-02-01 20:28:45 +00:00
}
2019-12-02 07:51:45 +00:00
// HandshakedPeersCount returns the number of connected peers
// which have already performed handshake.
func ( s * Server ) HandshakedPeersCount ( ) int {
s . lock . RLock ( )
defer s . lock . RUnlock ( )
var count int
for p := range s . peers {
if p . Handshaked ( ) {
count ++
}
}
return count
}
2018-02-01 20:28:45 +00:00
2020-01-21 14:26:08 +00:00
// getVersionMsg returns current version message.
2020-05-22 09:17:17 +00:00
func ( s * Server ) getVersionMsg ( ) ( * Message , error ) {
2020-06-10 07:01:21 +00:00
port , err := s . Port ( )
2020-05-22 09:17:17 +00:00
if err != nil {
2020-06-10 07:01:21 +00:00
return nil , err
2020-05-22 09:17:17 +00:00
}
capabilities := [ ] capability . Capability {
{
Type : capability . TCPServer ,
Data : & capability . Server {
Port : port ,
} ,
} ,
}
if s . Relay {
capabilities = append ( capabilities , capability . Capability {
Type : capability . FullNode ,
Data : & capability . Node {
StartHeight : s . chain . BlockHeight ( ) ,
} ,
} )
}
2018-03-14 09:36:59 +00:00
payload := payload . NewVersion (
2020-05-21 10:35:44 +00:00
s . Net ,
2018-03-14 09:36:59 +00:00
s . id ,
s . UserAgent ,
2020-05-22 09:17:17 +00:00
capabilities ,
2018-03-09 15:55:25 +00:00
)
2020-05-22 09:17:17 +00:00
return NewMessage ( CMDVersion , payload ) , nil
2018-03-14 09:36:59 +00:00
}
2018-03-09 15:55:25 +00:00
2020-02-14 17:46:05 +00:00
// IsInSync answers the question of whether the server is in sync with the
// network or not (at least how the server itself sees it). The server operates
// with the data that it has, the number of peers (that has to be more than
// minimum number) and height of these peers (our chain has to be not lower
// than 2/3 of our peers have). Ideally we would check for the highest of the
// peers, but the problem is that they can lie to us and send whatever height
// they want to.
func ( s * Server ) IsInSync ( ) bool {
var peersNumber int
var notHigher int
if s . MinPeers == 0 {
return true
}
ourLastBlock := s . chain . BlockHeight ( )
s . lock . RLock ( )
for p := range s . peers {
if p . Handshaked ( ) {
peersNumber ++
if ourLastBlock >= p . LastBlockIndex ( ) {
notHigher ++
}
}
}
s . lock . RUnlock ( )
// Checking bQueue would also be nice, but it can be filled with garbage
// easily at the moment.
return peersNumber >= s . MinPeers && ( 3 * notHigher > 2 * peersNumber ) // && s.bQueue.length() == 0
}
2018-03-14 09:36:59 +00:00
// When a peer sends out his version we reply with verack after validating
// the version.
func ( s * Server ) handleVersionCmd ( p Peer , version * payload . Version ) error {
2019-09-13 12:43:22 +00:00
err := p . HandleVersion ( version )
if err != nil {
return err
2018-03-14 09:36:59 +00:00
}
if s . id == version . Nonce {
return errIdenticalID
2018-01-28 13:59:32 +00:00
}
2020-05-21 10:35:44 +00:00
// Make sure both server and peer are operating on
// the same network.
if s . Net != version . Magic {
return errInvalidNetwork
}
2019-11-06 09:39:17 +00:00
peerAddr := p . PeerAddr ( ) . String ( )
2020-01-28 16:10:13 +00:00
s . discovery . RegisterConnectedAddr ( peerAddr )
2019-11-06 09:39:17 +00:00
s . lock . RLock ( )
for peer := range s . peers {
2020-01-28 10:54:09 +00:00
if p == peer {
continue
}
ver := peer . Version ( )
2019-11-06 09:39:17 +00:00
// Already connected, drop this connection.
2020-01-28 10:54:09 +00:00
if ver != nil && ver . Nonce == version . Nonce && peer . PeerAddr ( ) . String ( ) == peerAddr {
2019-11-06 09:39:17 +00:00
s . lock . RUnlock ( )
return errAlreadyConnected
}
}
s . lock . RUnlock ( )
2020-12-23 12:32:16 +00:00
return p . SendVersionAck ( NewMessage ( CMDVerack , payload . NewNullPayload ( ) ) )
2018-01-28 13:59:32 +00:00
}
2018-03-14 09:36:59 +00:00
// handleBlockCmd processes the received block received from its peer.
2020-01-14 12:32:07 +00:00
func ( s * Server ) handleBlockCmd ( p Peer , block * block . Block ) error {
2019-09-25 16:54:31 +00:00
return s . bQueue . putBlock ( block )
2018-03-14 09:36:59 +00:00
}
2018-02-01 08:00:42 +00:00
2020-01-17 10:17:19 +00:00
// handlePing processes ping request.
func ( s * Server ) handlePing ( p Peer , ping * payload . Ping ) error {
2020-08-14 13:22:15 +00:00
err := p . HandlePing ( ping )
if err != nil {
return err
}
if s . chain . BlockHeight ( ) < ping . LastBlockIndex {
err = s . requestBlocks ( p )
if err != nil {
return err
}
}
2020-05-21 10:35:44 +00:00
return p . EnqueueP2PMessage ( NewMessage ( CMDPong , payload . NewPing ( s . chain . BlockHeight ( ) , s . id ) ) )
2020-01-17 10:17:19 +00:00
}
// handlePing processes pong request.
func ( s * Server ) handlePong ( p Peer , pong * payload . Ping ) error {
2020-01-20 16:02:19 +00:00
err := p . HandlePong ( pong )
if err != nil {
return err
2020-01-17 10:17:19 +00:00
}
2020-07-31 14:12:13 +00:00
if s . chain . BlockHeight ( ) < pong . LastBlockIndex {
return s . requestBlocks ( p )
2020-01-17 10:17:19 +00:00
}
return nil
}
2019-10-22 14:56:03 +00:00
// handleInvCmd processes the received inventory.
2018-03-14 09:36:59 +00:00
func ( s * Server ) handleInvCmd ( p Peer , inv * payload . Inventory ) error {
2019-12-02 08:02:52 +00:00
reqHashes := make ( [ ] util . Uint256 , 0 )
var typExists = map [ payload . InventoryType ] func ( util . Uint256 ) bool {
payload . TXType : s . chain . HasTransaction ,
payload . BlockType : s . chain . HasBlock ,
2021-01-14 13:38:40 +00:00
payload . ExtensibleType : func ( h util . Uint256 ) bool {
2021-01-18 12:52:51 +00:00
cp := s . extensiblePool . Get ( h )
2019-12-02 08:02:52 +00:00
return cp != nil
} ,
2020-11-27 10:55:48 +00:00
payload . P2PNotaryRequestType : func ( h util . Uint256 ) bool {
return s . notaryRequestPool . ContainsKey ( h )
} ,
2019-12-02 08:02:52 +00:00
}
if exists := typExists [ inv . Type ] ; exists != nil {
for _ , hash := range inv . Hashes {
if ! exists ( hash ) {
reqHashes = append ( reqHashes , hash )
}
}
}
if len ( reqHashes ) > 0 {
2020-05-21 10:35:44 +00:00
msg := NewMessage ( CMDGetData , payload . NewInventory ( inv . Type , reqHashes ) )
2020-01-16 18:16:31 +00:00
pkt , err := msg . Bytes ( )
if err != nil {
return err
}
2021-01-14 13:38:40 +00:00
if inv . Type == payload . ExtensibleType {
2020-12-22 12:55:55 +00:00
return p . EnqueueHPPacket ( true , pkt )
2020-01-16 18:16:31 +00:00
}
2020-01-23 16:40:40 +00:00
return p . EnqueueP2PPacket ( pkt )
2019-12-02 08:02:52 +00:00
}
return nil
2018-03-09 15:55:25 +00:00
}
2018-02-01 08:00:42 +00:00
2020-06-19 12:03:40 +00:00
// handleMempoolCmd handles getmempool command.
func ( s * Server ) handleMempoolCmd ( p Peer ) error {
txs := s . chain . GetMemPool ( ) . GetVerifiedTransactions ( )
hs := make ( [ ] util . Uint256 , 0 , payload . MaxHashesCount )
for i := range txs {
hs = append ( hs , txs [ i ] . Hash ( ) )
if len ( hs ) < payload . MaxHashesCount && i != len ( txs ) - 1 {
continue
}
msg := NewMessage ( CMDInv , payload . NewInventory ( payload . TXType , hs ) )
err := p . EnqueueP2PMessage ( msg )
if err != nil {
return err
}
hs = hs [ : 0 ]
}
return nil
}
2019-10-24 07:18:30 +00:00
// handleInvCmd processes the received inventory.
func ( s * Server ) handleGetDataCmd ( p Peer , inv * payload . Inventory ) error {
2020-07-08 12:25:58 +00:00
var notFound [ ] util . Uint256
2020-01-16 18:16:31 +00:00
for _ , hash := range inv . Hashes {
var msg * Message
switch inv . Type {
case payload . TXType :
2019-10-24 07:18:30 +00:00
tx , _ , err := s . chain . GetTransaction ( hash )
if err == nil {
2020-05-21 10:35:44 +00:00
msg = NewMessage ( CMDTX , tx )
2020-07-08 12:25:58 +00:00
} else {
notFound = append ( notFound , hash )
2019-10-24 07:18:30 +00:00
}
2020-01-16 18:16:31 +00:00
case payload . BlockType :
2019-10-24 07:18:30 +00:00
b , err := s . chain . GetBlock ( hash )
if err == nil {
2020-05-21 10:35:44 +00:00
msg = NewMessage ( CMDBlock , b )
2020-07-08 12:25:58 +00:00
} else {
notFound = append ( notFound , hash )
2019-10-24 07:18:30 +00:00
}
2021-01-14 13:38:40 +00:00
case payload . ExtensibleType :
2021-01-18 12:52:51 +00:00
if cp := s . extensiblePool . Get ( hash ) ; cp != nil {
2021-01-14 13:38:40 +00:00
msg = NewMessage ( CMDExtensible , cp )
2020-01-16 18:16:31 +00:00
}
2020-11-27 10:55:48 +00:00
case payload . P2PNotaryRequestType :
if nrp , ok := s . notaryRequestPool . TryGetData ( hash ) ; ok { // already have checked P2PSigExtEnabled
msg = NewMessage ( CMDP2PNotaryRequest , nrp . ( * payload . P2PNotaryRequest ) )
} else {
notFound = append ( notFound , hash )
}
2020-01-16 18:16:31 +00:00
}
if msg != nil {
pkt , err := msg . Bytes ( )
2020-01-27 08:55:03 +00:00
if err == nil {
2021-01-14 13:38:40 +00:00
if inv . Type == payload . ExtensibleType {
2020-12-22 12:55:55 +00:00
err = p . EnqueueHPPacket ( true , pkt )
2020-01-27 08:55:03 +00:00
} else {
2020-01-23 16:40:40 +00:00
err = p . EnqueueP2PPacket ( pkt )
2020-01-27 08:55:03 +00:00
}
}
2020-01-16 18:16:31 +00:00
if err != nil {
return err
}
2019-11-08 15:40:21 +00:00
}
2019-10-24 07:18:30 +00:00
}
2020-07-08 12:25:58 +00:00
if len ( notFound ) != 0 {
return p . EnqueueP2PMessage ( NewMessage ( CMDNotFound , payload . NewInventory ( inv . Type , notFound ) ) )
}
2019-10-24 07:18:30 +00:00
return nil
}
2019-12-25 16:40:18 +00:00
// handleGetBlocksCmd processes the getblocks request.
func ( s * Server ) handleGetBlocksCmd ( p Peer , gb * payload . GetBlocks ) error {
2020-05-22 14:30:56 +00:00
count := gb . Count
if gb . Count < 0 || gb . Count > payload . MaxHashesCount {
count = payload . MaxHashesCount
2019-12-25 16:40:18 +00:00
}
2020-05-22 14:30:56 +00:00
start , err := s . chain . GetHeader ( gb . HashStart )
2019-12-25 16:40:18 +00:00
if err != nil {
return err
}
blockHashes := make ( [ ] util . Uint256 , 0 )
2020-12-07 15:40:04 +00:00
for i := start . Index + 1 ; i <= start . Index + uint32 ( count ) ; i ++ {
2019-12-25 16:40:18 +00:00
hash := s . chain . GetHeaderHash ( int ( i ) )
2020-05-22 14:30:56 +00:00
if hash . Equals ( util . Uint256 { } ) {
2019-12-25 16:40:18 +00:00
break
}
blockHashes = append ( blockHashes , hash )
}
if len ( blockHashes ) == 0 {
return nil
}
payload := payload . NewInventory ( payload . BlockType , blockHashes )
2020-05-21 10:35:44 +00:00
msg := NewMessage ( CMDInv , payload )
2020-01-23 16:40:40 +00:00
return p . EnqueueP2PMessage ( msg )
2019-12-25 16:40:18 +00:00
}
2020-07-31 10:58:22 +00:00
// handleGetBlockByIndexCmd processes the getblockbyindex request.
func ( s * Server ) handleGetBlockByIndexCmd ( p Peer , gbd * payload . GetBlockByIndex ) error {
2020-07-31 11:17:14 +00:00
count := gbd . Count
if gbd . Count < 0 || gbd . Count > payload . MaxHashesCount {
count = payload . MaxHashesCount
}
for i := gbd . IndexStart ; i < gbd . IndexStart + uint32 ( count ) ; i ++ {
2020-07-31 11:51:51 +00:00
hash := s . chain . GetHeaderHash ( int ( i ) )
if hash . Equals ( util . Uint256 { } ) {
break
}
b , err := s . chain . GetBlock ( hash )
2020-05-22 12:43:46 +00:00
if err != nil {
2020-07-31 11:51:51 +00:00
break
2020-05-22 12:43:46 +00:00
}
msg := NewMessage ( CMDBlock , b )
2020-07-31 11:51:51 +00:00
if err = p . EnqueueP2PMessage ( msg ) ; err != nil {
return err
}
2020-05-22 12:43:46 +00:00
}
return nil
}
2019-11-29 08:08:22 +00:00
// handleGetHeadersCmd processes the getheaders request.
2020-07-31 11:47:42 +00:00
func ( s * Server ) handleGetHeadersCmd ( p Peer , gh * payload . GetBlockByIndex ) error {
if gh . IndexStart > s . chain . HeaderHeight ( ) {
return nil
2019-11-29 08:08:22 +00:00
}
2020-07-31 11:47:42 +00:00
count := gh . Count
if gh . Count < 0 || gh . Count > payload . MaxHeadersAllowed {
count = payload . MaxHeadersAllowed
2019-11-29 08:08:22 +00:00
}
resp := payload . Headers { }
2020-07-31 11:47:42 +00:00
resp . Hdrs = make ( [ ] * block . Header , 0 , count )
for i := gh . IndexStart ; i < gh . IndexStart + uint32 ( count ) ; i ++ {
2019-11-29 08:08:22 +00:00
hash := s . chain . GetHeaderHash ( int ( i ) )
2020-05-22 14:30:56 +00:00
if hash . Equals ( util . Uint256 { } ) {
2019-11-29 08:08:22 +00:00
break
}
header , err := s . chain . GetHeader ( hash )
if err != nil {
break
}
resp . Hdrs = append ( resp . Hdrs , header )
}
if len ( resp . Hdrs ) == 0 {
return nil
}
2020-05-21 10:35:44 +00:00
msg := NewMessage ( CMDHeaders , & resp )
2020-01-23 16:40:40 +00:00
return p . EnqueueP2PMessage ( msg )
2019-11-29 08:08:22 +00:00
}
2021-01-14 13:38:40 +00:00
// handleExtensibleCmd processes received extensible payload.
func ( s * Server ) handleExtensibleCmd ( e * payload . Extensible ) error {
2021-04-02 09:55:56 +00:00
if ! s . syncReached . Load ( ) {
return nil
2021-02-05 11:54:43 +00:00
}
2021-01-18 12:52:51 +00:00
ok , err := s . extensiblePool . Add ( e )
if err != nil {
2021-01-14 13:38:40 +00:00
return err
}
2021-01-18 12:52:51 +00:00
if ! ok { // payload is already in cache
return nil
2021-01-14 13:38:40 +00:00
}
switch e . Category {
case consensus . Category :
s . consensus . OnPayload ( e )
2021-02-01 16:00:07 +00:00
case stateroot . Category :
err := s . stateRoot . OnPayload ( e )
if err != nil {
return err
}
2021-01-14 13:38:40 +00:00
default :
return errors . New ( "invalid category" )
}
2021-01-18 12:52:51 +00:00
msg := NewMessage ( CMDInv , payload . NewInventory ( payload . ExtensibleType , [ ] util . Uint256 { e . Hash ( ) } ) )
if e . Category == consensus . Category {
s . broadcastHPMessage ( msg )
} else {
s . broadcastMessage ( msg )
}
2019-11-08 15:40:21 +00:00
return nil
}
2019-11-15 10:32:40 +00:00
// handleTxCmd processes received transaction.
// It never returns an error.
func ( s * Server ) handleTxCmd ( tx * transaction . Transaction ) error {
2019-11-29 08:09:54 +00:00
// It's OK for it to fail for various reasons like tx already existing
// in the pool.
2021-02-17 11:51:54 +00:00
if s . verifyAndPoolTX ( tx ) == nil {
2020-01-29 08:56:40 +00:00
s . consensus . OnTransaction ( tx )
2020-11-27 10:55:48 +00:00
s . broadcastTX ( tx , nil )
}
return nil
}
// handleP2PNotaryRequestCmd process received P2PNotaryRequest payload.
func ( s * Server ) handleP2PNotaryRequestCmd ( r * payload . P2PNotaryRequest ) error {
if ! s . chain . P2PSigExtensionsEnabled ( ) {
return errors . New ( "P2PNotaryRequestCMD was received, but P2PSignatureExtensions are disabled" )
}
2021-02-17 11:51:54 +00:00
// It's OK for it to fail for various reasons like request already existing
// in the pool.
2021-05-12 17:14:35 +00:00
_ = s . RelayP2PNotaryRequest ( r )
2021-02-08 08:48:28 +00:00
return nil
}
// RelayP2PNotaryRequest adds given request to the pool and relays. It does not check
// P2PSigExtensions enabled.
2021-02-17 11:51:54 +00:00
func ( s * Server ) RelayP2PNotaryRequest ( r * payload . P2PNotaryRequest ) error {
err := s . verifyAndPoolNotaryRequest ( r )
if err == nil {
2020-11-27 10:55:48 +00:00
s . broadcastP2PNotaryRequestPayload ( nil , r )
}
2021-02-17 11:51:54 +00:00
return err
2020-11-27 10:55:48 +00:00
}
// verifyAndPoolNotaryRequest verifies NotaryRequest payload and adds it to the payload mempool.
2021-02-17 11:51:54 +00:00
func ( s * Server ) verifyAndPoolNotaryRequest ( r * payload . P2PNotaryRequest ) error {
return s . chain . PoolTxWithData ( r . FallbackTransaction , r , s . notaryRequestPool , s . notaryFeer , verifyNotaryRequest )
2020-11-27 10:55:48 +00:00
}
// verifyNotaryRequest is a function for state-dependant P2PNotaryRequest payload verification which is executed before ordinary blockchain's verification.
func verifyNotaryRequest ( bc blockchainer . Blockchainer , _ * transaction . Transaction , data interface { } ) error {
r := data . ( * payload . P2PNotaryRequest )
payer := r . FallbackTransaction . Signers [ 1 ] . Account
if err := bc . VerifyWitness ( payer , r , & r . Witness , bc . GetPolicer ( ) . GetMaxVerificationGAS ( ) ) ; err != nil {
return fmt . Errorf ( "bad P2PNotaryRequest payload witness: %w" , err )
}
2020-12-30 08:01:13 +00:00
notaryHash := bc . GetNotaryContractScriptHash ( )
if r . FallbackTransaction . Sender ( ) != notaryHash {
2020-11-27 10:55:48 +00:00
return errors . New ( "P2PNotary contract should be a sender of the fallback transaction" )
}
depositExpiration := bc . GetNotaryDepositExpiration ( payer )
if r . FallbackTransaction . ValidUntilBlock >= depositExpiration {
return fmt . Errorf ( "fallback transaction is valid after deposit is unlocked: ValidUntilBlock is %d, deposit lock expires at %d" , r . FallbackTransaction . ValidUntilBlock , depositExpiration )
2020-01-29 08:56:40 +00:00
}
2019-11-15 10:32:40 +00:00
return nil
}
2020-11-27 10:55:48 +00:00
func ( s * Server ) broadcastP2PNotaryRequestPayload ( _ * transaction . Transaction , data interface { } ) {
2021-02-08 15:58:10 +00:00
r := data . ( * payload . P2PNotaryRequest ) // we can guarantee that cast is successful
2020-11-27 10:55:48 +00:00
msg := NewMessage ( CMDInv , payload . NewInventory ( payload . P2PNotaryRequestType , [ ] util . Uint256 { r . FallbackTransaction . Hash ( ) } ) )
s . broadcastMessage ( msg )
}
2019-09-13 09:03:07 +00:00
// handleAddrCmd will process received addresses.
func ( s * Server ) handleAddrCmd ( p Peer , addrs * payload . AddressList ) error {
2020-11-25 10:34:38 +00:00
if ! p . CanProcessAddr ( ) {
return errors . New ( "unexpected addr received" )
}
2021-03-26 09:31:07 +00:00
dups := make ( map [ string ] bool )
2019-09-13 09:03:07 +00:00
for _ , a := range addrs . Addrs {
2020-05-22 09:59:18 +00:00
addr , err := a . GetTCPAddress ( )
2021-03-26 09:31:07 +00:00
if err == nil && ! dups [ addr ] {
dups [ addr ] = true
2020-05-22 09:59:18 +00:00
s . discovery . BackFill ( addr )
}
2019-09-13 09:03:07 +00:00
}
return nil
}
2019-09-13 17:38:34 +00:00
// handleGetAddrCmd sends to the peer some good addresses that we know of.
func ( s * Server ) handleGetAddrCmd ( p Peer ) error {
addrs := s . discovery . GoodPeers ( )
2020-10-07 20:29:20 +00:00
if len ( addrs ) > payload . MaxAddrsCount {
addrs = addrs [ : payload . MaxAddrsCount ]
2019-09-13 17:38:34 +00:00
}
alist := payload . NewAddressList ( len ( addrs ) )
ts := time . Now ( )
for i , addr := range addrs {
// we know it's a good address, so it can't fail
2020-05-22 09:59:18 +00:00
netaddr , _ := net . ResolveTCPAddr ( "tcp" , addr . Address )
alist . Addrs [ i ] = payload . NewAddressAndTime ( netaddr , ts , addr . Capabilities )
2019-09-13 17:38:34 +00:00
}
2020-05-21 10:35:44 +00:00
return p . EnqueueP2PMessage ( NewMessage ( CMDAddr , alist ) )
2019-09-13 17:38:34 +00:00
}
2020-07-31 14:12:13 +00:00
// requestBlocks sends a CMDGetBlockByIndex message to the peer
2018-03-14 09:36:59 +00:00
// to sync up in blocks. A maximum of maxBlockBatch will
2020-11-26 15:53:10 +00:00
// send at once. Two things we need to take care of:
// 1. If possible, blocks should be fetched in parallel.
// height..+500 to one peer, height+500..+1000 to another etc.
// 2. Every block must eventually be fetched even if peer sends no answer.
// Thus the following algorithm is used:
// 1. Block range is divided into chunks of payload.MaxHashesCount.
// 2. Send requests for chunk in increasing order.
// 3. After all requests were sent, request random height.
2019-09-13 12:36:53 +00:00
func ( s * Server ) requestBlocks ( p Peer ) error {
2020-11-26 15:53:10 +00:00
var currHeight = s . chain . BlockHeight ( )
var peerHeight = p . LastBlockIndex ( )
var needHeight uint32
// lastRequestedHeight can only be increased.
for {
old := s . lastRequestedHeight . Load ( )
if old <= currHeight {
needHeight = currHeight + 1
if ! s . lastRequestedHeight . CAS ( old , needHeight ) {
continue
}
} else if old < currHeight + ( blockCacheSize - payload . MaxHashesCount ) {
needHeight = currHeight + 1
if peerHeight > old + payload . MaxHashesCount {
needHeight = old + payload . MaxHashesCount
if ! s . lastRequestedHeight . CAS ( old , needHeight ) {
continue
}
}
} else {
index := mrand . Intn ( blockCacheSize / payload . MaxHashesCount )
needHeight = currHeight + 1 + uint32 ( index * payload . MaxHashesCount )
}
break
}
payload := payload . NewGetBlockByIndex ( needHeight , - 1 )
2020-07-31 14:12:13 +00:00
return p . EnqueueP2PMessage ( NewMessage ( CMDGetBlockByIndex , payload ) )
2018-02-01 08:00:42 +00:00
}
2019-10-22 14:56:03 +00:00
// handleMessage processes the given message.
2018-04-13 10:14:08 +00:00
func ( s * Server ) handleMessage ( peer Peer , msg * Message ) error {
2020-01-28 13:40:38 +00:00
s . log . Debug ( "got msg" ,
zap . Stringer ( "addr" , peer . RemoteAddr ( ) ) ,
2020-05-19 11:54:51 +00:00
zap . String ( "type" , msg . Command . String ( ) ) )
2020-01-28 13:40:38 +00:00
2019-09-13 12:43:22 +00:00
if peer . Handshaked ( ) {
2019-10-24 10:10:10 +00:00
if inv , ok := msg . Payload . ( * payload . Inventory ) ; ok {
2020-11-27 10:55:48 +00:00
if ! inv . Type . Valid ( s . chain . P2PSigExtensionsEnabled ( ) ) || len ( inv . Hashes ) == 0 {
2019-10-24 10:10:10 +00:00
return errInvalidInvType
}
}
2020-05-19 11:54:51 +00:00
switch msg . Command {
2019-09-13 12:43:22 +00:00
case CMDAddr :
addrs := msg . Payload . ( * payload . AddressList )
return s . handleAddrCmd ( peer , addrs )
2019-09-13 17:38:34 +00:00
case CMDGetAddr :
// it has no payload
return s . handleGetAddrCmd ( peer )
2019-12-25 16:40:18 +00:00
case CMDGetBlocks :
gb := msg . Payload . ( * payload . GetBlocks )
return s . handleGetBlocksCmd ( peer , gb )
2020-07-31 10:58:22 +00:00
case CMDGetBlockByIndex :
gbd := msg . Payload . ( * payload . GetBlockByIndex )
return s . handleGetBlockByIndexCmd ( peer , gbd )
2019-10-24 07:18:30 +00:00
case CMDGetData :
inv := msg . Payload . ( * payload . Inventory )
return s . handleGetDataCmd ( peer , inv )
2019-11-29 08:08:22 +00:00
case CMDGetHeaders :
2020-07-31 11:47:42 +00:00
gh := msg . Payload . ( * payload . GetBlockByIndex )
2019-11-29 08:08:22 +00:00
return s . handleGetHeadersCmd ( peer , gh )
2019-09-13 12:43:22 +00:00
case CMDInv :
inventory := msg . Payload . ( * payload . Inventory )
return s . handleInvCmd ( peer , inventory )
2020-06-19 12:03:40 +00:00
case CMDMempool :
// no payload
return s . handleMempoolCmd ( peer )
2019-09-13 12:43:22 +00:00
case CMDBlock :
2020-01-14 12:32:07 +00:00
block := msg . Payload . ( * block . Block )
2019-09-13 12:43:22 +00:00
return s . handleBlockCmd ( peer , block )
2021-01-14 13:38:40 +00:00
case CMDExtensible :
cp := msg . Payload . ( * payload . Extensible )
return s . handleExtensibleCmd ( cp )
2019-11-15 10:32:40 +00:00
case CMDTX :
tx := msg . Payload . ( * transaction . Transaction )
return s . handleTxCmd ( tx )
2020-11-27 10:55:48 +00:00
case CMDP2PNotaryRequest :
r := msg . Payload . ( * payload . P2PNotaryRequest )
return s . handleP2PNotaryRequestCmd ( r )
2020-01-17 10:17:19 +00:00
case CMDPing :
ping := msg . Payload . ( * payload . Ping )
return s . handlePing ( peer , ping )
case CMDPong :
pong := msg . Payload . ( * payload . Ping )
return s . handlePong ( peer , pong )
2019-09-13 12:43:22 +00:00
case CMDVersion , CMDVerack :
2020-05-19 11:54:51 +00:00
return fmt . Errorf ( "received '%s' after the handshake" , msg . Command . String ( ) )
2019-09-13 12:43:22 +00:00
}
} else {
2020-05-19 11:54:51 +00:00
switch msg . Command {
2019-09-13 12:43:22 +00:00
case CMDVersion :
version := msg . Payload . ( * payload . Version )
return s . handleVersionCmd ( peer , version )
case CMDVerack :
err := peer . HandleVersionAck ( )
if err != nil {
return err
}
2020-01-15 14:03:42 +00:00
go peer . StartProtocol ( )
2019-11-15 10:32:40 +00:00
2021-04-02 09:55:56 +00:00
s . tryStartServices ( )
2019-09-13 12:43:22 +00:00
default :
2020-05-19 11:54:51 +00:00
return fmt . Errorf ( "received '%s' during handshake" , msg . Command . String ( ) )
2018-03-14 09:36:59 +00:00
}
}
return nil
2018-01-26 18:04:13 +00:00
}
Implement rpc server method: sendrawtransaction (#174)
* Added new config attributes: 'SecondsPerBlock','LowPriorityThreshold'
* Added new files:
* Added new method: CompareTo
* Fixed empty Slice case
* Added new methods: LessThan, GreaterThan, Equal, CompareTo
* Added new method: InputIntersection
* Added MaxTransactionSize, GroupOutputByAssetID
* Added ned method: ScriptHash
* Added new method: IsDoubleSpend
* Refactor blockchainer, Added Feer interface, Verify and GetMemPool method
* 1) Added MemPool
2) Added new methods to satisfy the blockchainer interface: IsLowPriority, Verify, GetMemPool
* Added new methods: RelayTxn, RelayDirectly
* Fixed tests
* Implemented RPC server method sendrawtransaction
* Refactor getrawtransaction, sendrawtransaction in separate methods
* Moved 'secondsPerBlock' to config file
* Implemented Kim suggestions:
1) Fixed data race issues
2) refactor Verify method
3) Get rid of unused InputIntersection method due to refactoring Verify method
4) Fixed bug in https://github.com/CityOfZion/neo-go/pull/174#discussion_r264108135
5) minor simplications of the code
* Fixed minor issues related to
1) space
2) getter methods do not need pointer on the receiver
3) error message
4) refactoring CompareTo method in uint256.go
* Fixed small issues
* Use sync.RWMutex instead of sync.Mutex
* Refined (R)Lock/(R)Unlock
* return error instead of bool in Verify methods
2019-03-20 12:30:05 +00:00
2021-01-14 13:38:40 +00:00
func ( s * Server ) handleNewPayload ( p * payload . Extensible ) {
2021-01-18 12:52:51 +00:00
_ , err := s . extensiblePool . Add ( p )
if err != nil {
s . log . Error ( "created payload is not valid" , zap . Error ( err ) )
return
}
2021-01-14 13:38:40 +00:00
msg := NewMessage ( CMDInv , payload . NewInventory ( payload . ExtensibleType , [ ] util . Uint256 { p . Hash ( ) } ) )
2021-02-02 09:34:27 +00:00
switch p . Category {
case consensus . Category :
// It's high priority because it directly affects consensus process,
// even though it's just an inv.
s . broadcastHPMessage ( msg )
default :
s . broadcastMessage ( msg )
}
2019-11-15 10:32:40 +00:00
}
func ( s * Server ) requestTx ( hashes ... util . Uint256 ) {
if len ( hashes ) == 0 {
return
}
2020-12-07 15:40:36 +00:00
for i := 0 ; i <= len ( hashes ) / payload . MaxHashesCount ; i ++ {
2020-09-04 13:04:40 +00:00
start := i * payload . MaxHashesCount
stop := ( i + 1 ) * payload . MaxHashesCount
2020-12-07 15:40:36 +00:00
if stop > len ( hashes ) {
2020-09-04 13:04:40 +00:00
stop = len ( hashes )
}
2020-12-07 15:40:36 +00:00
if start == stop {
break
}
2020-09-04 13:04:40 +00:00
msg := NewMessage ( CMDGetData , payload . NewInventory ( payload . TXType , hashes [ start : stop ] ) )
// It's high priority because it directly affects consensus process,
// even though it's getdata.
s . broadcastHPMessage ( msg )
}
2019-11-15 10:32:40 +00:00
}
2020-01-22 08:01:13 +00:00
// iteratePeersWithSendMsg sends given message to all peers using two functions
// passed, one is to send the message and the other is to filtrate peers (the
// peer is considered invalid if it returns false).
2020-12-22 12:55:55 +00:00
func ( s * Server ) iteratePeersWithSendMsg ( msg * Message , send func ( Peer , bool , [ ] byte ) error , peerOK func ( Peer ) bool ) {
2020-08-31 16:07:28 +00:00
// Get a copy of s.peers to avoid holding a lock while sending.
peers := s . Peers ( )
if len ( peers ) == 0 {
return
}
2020-01-22 08:01:13 +00:00
pkt , err := msg . Bytes ( )
if err != nil {
return
}
2020-12-22 12:55:55 +00:00
success := make ( map [ Peer ] bool , len ( peers ) )
okCount := 0
sentCount := 0
2020-08-31 16:07:28 +00:00
for peer := range peers {
2020-01-22 08:01:13 +00:00
if peerOK != nil && ! peerOK ( peer ) {
2020-12-22 12:55:55 +00:00
success [ peer ] = false
continue
}
okCount ++
if err := send ( peer , false , pkt ) ; err != nil {
continue
}
if msg . Command == CMDGetAddr {
peer . AddGetAddrSent ( )
}
success [ peer ] = true
sentCount ++
}
// Send to at least 2/3 of good peers.
if 3 * sentCount >= 2 * okCount {
return
}
// Perform blocking send now.
for peer := range peers {
if _ , ok := success [ peer ] ; ok || peerOK != nil && ! peerOK ( peer ) {
continue
}
if err := send ( peer , true , pkt ) ; err != nil {
2019-11-15 10:32:40 +00:00
continue
}
2020-11-25 10:34:38 +00:00
if msg . Command == CMDGetAddr {
peer . AddGetAddrSent ( )
}
2020-12-22 12:55:55 +00:00
sentCount ++
if 3 * sentCount >= 2 * okCount {
return
}
2019-11-15 10:32:40 +00:00
}
}
2020-01-22 08:01:13 +00:00
// broadcastMessage sends the message to all available peers.
func ( s * Server ) broadcastMessage ( msg * Message ) {
s . iteratePeersWithSendMsg ( msg , Peer . EnqueuePacket , nil )
}
// broadcastHPMessage sends the high-priority message to all available peers.
func ( s * Server ) broadcastHPMessage ( msg * Message ) {
s . iteratePeersWithSendMsg ( msg , Peer . EnqueueHPPacket , nil )
}
2020-05-07 20:00:38 +00:00
// relayBlocksLoop subscribes to new blocks in the ledger and broadcasts them
// to the network. Intended to be run as a separate goroutine.
func ( s * Server ) relayBlocksLoop ( ) {
ch := make ( chan * block . Block , 2 ) // Some buffering to smooth out possible egressing delays.
s . chain . SubscribeForBlocks ( ch )
for {
select {
case <- s . quit :
s . chain . UnsubscribeFromBlocks ( ch )
return
case b := <- ch :
msg := NewMessage ( CMDInv , payload . NewInventory ( payload . BlockType , [ ] util . Uint256 { b . Hash ( ) } ) )
// Filter out nodes that are more current (avoid spamming the network
// during initial sync).
s . iteratePeersWithSendMsg ( msg , Peer . EnqueuePacket , func ( p Peer ) bool {
return p . Handshaked ( ) && p . LastBlockIndex ( ) < b . Index
} )
2021-01-18 12:52:51 +00:00
s . extensiblePool . RemoveStale ( b . Index )
2020-05-07 20:00:38 +00:00
}
}
2019-11-29 09:27:15 +00:00
}
2020-01-29 08:56:40 +00:00
// verifyAndPoolTX verifies the TX and adds it to the local mempool.
2021-02-17 11:51:54 +00:00
func ( s * Server ) verifyAndPoolTX ( t * transaction . Transaction ) error {
return s . chain . PoolTx ( t )
2020-01-29 08:56:40 +00:00
}
// RelayTxn a new transaction to the local node and the connected peers.
// Reference: the method OnRelay in C#: https://github.com/neo-project/neo/blob/master/neo/Network/P2P/LocalNode.cs#L159
2021-02-17 11:51:54 +00:00
func ( s * Server ) RelayTxn ( t * transaction . Transaction ) error {
err := s . verifyAndPoolTX ( t )
if err == nil {
2020-11-27 10:55:48 +00:00
s . broadcastTX ( t , nil )
2020-01-29 08:56:40 +00:00
}
2021-02-17 11:51:54 +00:00
return err
2020-01-29 08:56:40 +00:00
}
Implement rpc server method: sendrawtransaction (#174)
* Added new config attributes: 'SecondsPerBlock','LowPriorityThreshold'
* Added new files:
* Added new method: CompareTo
* Fixed empty Slice case
* Added new methods: LessThan, GreaterThan, Equal, CompareTo
* Added new method: InputIntersection
* Added MaxTransactionSize, GroupOutputByAssetID
* Added ned method: ScriptHash
* Added new method: IsDoubleSpend
* Refactor blockchainer, Added Feer interface, Verify and GetMemPool method
* 1) Added MemPool
2) Added new methods to satisfy the blockchainer interface: IsLowPriority, Verify, GetMemPool
* Added new methods: RelayTxn, RelayDirectly
* Fixed tests
* Implemented RPC server method sendrawtransaction
* Refactor getrawtransaction, sendrawtransaction in separate methods
* Moved 'secondsPerBlock' to config file
* Implemented Kim suggestions:
1) Fixed data race issues
2) refactor Verify method
3) Get rid of unused InputIntersection method due to refactoring Verify method
4) Fixed bug in https://github.com/CityOfZion/neo-go/pull/174#discussion_r264108135
5) minor simplications of the code
* Fixed minor issues related to
1) space
2) getter methods do not need pointer on the receiver
3) error message
4) refactoring CompareTo method in uint256.go
* Fixed small issues
* Use sync.RWMutex instead of sync.Mutex
* Refined (R)Lock/(R)Unlock
* return error instead of bool in Verify methods
2019-03-20 12:30:05 +00:00
2020-01-29 08:56:40 +00:00
// broadcastTX broadcasts an inventory message about new transaction.
2020-11-27 10:55:48 +00:00
func ( s * Server ) broadcastTX ( t * transaction . Transaction , _ interface { } ) {
2020-02-18 15:11:55 +00:00
select {
case s . transactions <- t :
case <- s . quit :
}
}
func ( s * Server ) broadcastTxHashes ( hs [ ] util . Uint256 ) {
2020-05-21 10:35:44 +00:00
msg := NewMessage ( CMDInv , payload . NewInventory ( payload . TXType , hs ) )
2020-01-22 08:01:13 +00:00
// We need to filter out non-relaying nodes, so plain broadcast
// functions don't fit here.
2020-05-22 09:17:17 +00:00
s . iteratePeersWithSendMsg ( msg , Peer . EnqueuePacket , Peer . IsFullNode )
Implement rpc server method: sendrawtransaction (#174)
* Added new config attributes: 'SecondsPerBlock','LowPriorityThreshold'
* Added new files:
* Added new method: CompareTo
* Fixed empty Slice case
* Added new methods: LessThan, GreaterThan, Equal, CompareTo
* Added new method: InputIntersection
* Added MaxTransactionSize, GroupOutputByAssetID
* Added ned method: ScriptHash
* Added new method: IsDoubleSpend
* Refactor blockchainer, Added Feer interface, Verify and GetMemPool method
* 1) Added MemPool
2) Added new methods to satisfy the blockchainer interface: IsLowPriority, Verify, GetMemPool
* Added new methods: RelayTxn, RelayDirectly
* Fixed tests
* Implemented RPC server method sendrawtransaction
* Refactor getrawtransaction, sendrawtransaction in separate methods
* Moved 'secondsPerBlock' to config file
* Implemented Kim suggestions:
1) Fixed data race issues
2) refactor Verify method
3) Get rid of unused InputIntersection method due to refactoring Verify method
4) Fixed bug in https://github.com/CityOfZion/neo-go/pull/174#discussion_r264108135
5) minor simplications of the code
* Fixed minor issues related to
1) space
2) getter methods do not need pointer on the receiver
3) error message
4) refactoring CompareTo method in uint256.go
* Fixed small issues
* Use sync.RWMutex instead of sync.Mutex
* Refined (R)Lock/(R)Unlock
* return error instead of bool in Verify methods
2019-03-20 12:30:05 +00:00
}
2020-02-18 15:11:55 +00:00
2020-11-27 10:55:48 +00:00
// initStaleMemPools initializes mempools for stale tx/payload processing.
func ( s * Server ) initStaleMemPools ( ) {
2020-11-11 12:49:51 +00:00
cfg := s . chain . GetConfig ( )
threshold := 5
if cfg . ValidatorsCount * 2 > threshold {
threshold = cfg . ValidatorsCount * 2
}
mp := s . chain . GetMemPool ( )
mp . SetResendThreshold ( uint32 ( threshold ) , s . broadcastTX )
2020-11-27 10:55:48 +00:00
if s . chain . P2PSigExtensionsEnabled ( ) {
s . notaryRequestPool . SetResendThreshold ( uint32 ( threshold ) , s . broadcastP2PNotaryRequestPayload )
}
2020-11-11 12:49:51 +00:00
}
2020-02-18 15:11:55 +00:00
// broadcastTxLoop is a loop for batching and sending
// transactions hashes in an INV payload.
func ( s * Server ) broadcastTxLoop ( ) {
const (
batchTime = time . Millisecond * 50
batchSize = 32
)
txs := make ( [ ] util . Uint256 , 0 , batchSize )
var timer * time . Timer
timerCh := func ( ) <- chan time . Time {
if timer == nil {
return nil
}
return timer . C
}
broadcast := func ( ) {
s . broadcastTxHashes ( txs )
txs = txs [ : 0 ]
if timer != nil {
timer . Stop ( )
}
}
for {
select {
case <- s . quit :
loop :
for {
select {
case <- s . transactions :
default :
break loop
}
}
return
case <- timerCh ( ) :
if len ( txs ) > 0 {
broadcast ( )
}
case tx := <- s . transactions :
if len ( txs ) == 0 {
timer = time . NewTimer ( batchTime )
}
txs = append ( txs , tx . Hash ( ) )
if len ( txs ) == batchSize {
broadcast ( )
}
}
}
}
2020-06-10 07:01:21 +00:00
2021-04-30 08:24:43 +00:00
// Port returns a server port that should be used in P2P version exchange. In
// case if `AnnouncedPort` is set in the server.Config, the announced node port
// will be returned (e.g. consider the node running behind NAT). If `AnnouncedPort`
// isn't set, the port returned may still differs from that of server.Config.
2020-06-10 07:01:21 +00:00
func ( s * Server ) Port ( ) ( uint16 , error ) {
2021-04-30 08:24:43 +00:00
if s . AnnouncedPort != 0 {
return s . ServerConfig . AnnouncedPort , nil
}
2020-06-10 07:01:21 +00:00
var port uint16
_ , portStr , err := net . SplitHostPort ( s . transport . Address ( ) )
if err != nil {
port = s . ServerConfig . Port
} else {
p , err := strconv . ParseUint ( portStr , 10 , 16 )
if err != nil {
return 0 , err
}
port = uint16 ( p )
}
return port , nil
}