2018-01-26 18:04:13 +00:00
package network
import (
2019-11-15 17:42:23 +00:00
"crypto/rand"
"encoding/binary"
2018-03-14 09:36:59 +00:00
"errors"
2018-01-26 18:04:13 +00:00
"fmt"
2022-01-14 01:09:54 +00:00
"math/big"
2020-11-26 15:53:10 +00:00
mrand "math/rand"
2019-09-13 17:38:34 +00:00
"net"
2021-07-30 13:57:42 +00:00
"sort"
2019-10-29 17:51:17 +00:00
"strconv"
2018-03-14 09:36:59 +00:00
"sync"
2018-01-28 10:12:05 +00:00
"time"
2018-01-27 15:00:28 +00:00
2022-01-14 01:09:54 +00:00
"github.com/nspcc-dev/neo-go/pkg/config"
2020-06-18 09:00:51 +00:00
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
2020-03-03 14:21:42 +00:00
"github.com/nspcc-dev/neo-go/pkg/core/block"
2020-11-27 10:55:48 +00:00
"github.com/nspcc-dev/neo-go/pkg/core/mempool"
2021-05-28 11:55:06 +00:00
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
2021-07-30 13:57:42 +00:00
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
2020-03-03 14:21:42 +00:00
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
2021-07-30 13:57:42 +00:00
"github.com/nspcc-dev/neo-go/pkg/io"
2020-05-22 09:17:17 +00:00
"github.com/nspcc-dev/neo-go/pkg/network/capability"
2021-01-18 12:52:51 +00:00
"github.com/nspcc-dev/neo-go/pkg/network/extpool"
2020-03-03 14:21:42 +00:00
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/util"
2019-11-15 10:32:40 +00:00
"go.uber.org/atomic"
2019-12-30 07:43:05 +00:00
"go.uber.org/zap"
2018-01-26 18:04:13 +00:00
)
const (
2019-10-22 14:56:03 +00:00
// peer numbers are arbitrary at the moment.
2021-05-04 14:54:16 +00:00
defaultMinPeers = 5
defaultAttemptConnPeers = 20
defaultMaxPeers = 100
defaultExtensiblePoolSize = 20
maxBlockBatch = 200
minPoolCount = 30
2018-01-26 18:04:13 +00:00
)
2018-03-14 09:36:59 +00:00
var (
2019-11-06 09:39:17 +00:00
errAlreadyConnected = errors . New ( "already connected" )
2018-03-14 09:36:59 +00:00
errIdenticalID = errors . New ( "identical node id" )
errInvalidNetwork = errors . New ( "invalid network" )
2019-11-06 12:17:20 +00:00
errMaxPeers = errors . New ( "max peers reached" )
2018-03-14 09:36:59 +00:00
errServerShutdown = errors . New ( "server shutdown" )
errInvalidInvType = errors . New ( "invalid inventory type" )
)
2018-02-01 20:28:45 +00:00
2018-03-14 09:36:59 +00:00
type (
2022-01-14 01:09:54 +00:00
// Ledger is everything Server needs from the blockchain.
Ledger interface {
extpool . Ledger
mempool . Feer
Blockqueuer
GetBlock ( hash util . Uint256 ) ( * block . Block , error )
GetConfig ( ) config . ProtocolConfiguration
GetHeader ( hash util . Uint256 ) ( * block . Header , error )
GetHeaderHash ( int ) util . Uint256
GetMaxVerificationGAS ( ) int64
GetMemPool ( ) * mempool . Pool
GetNotaryBalance ( acc util . Uint160 ) * big . Int
GetNotaryContractScriptHash ( ) util . Uint160
GetNotaryDepositExpiration ( acc util . Uint160 ) uint32
GetTransaction ( util . Uint256 ) ( * transaction . Transaction , uint32 , error )
HasBlock ( util . Uint256 ) bool
HeaderHeight ( ) uint32
P2PSigExtensionsEnabled ( ) bool
PoolTx ( t * transaction . Transaction , pools ... * mempool . Pool ) error
PoolTxWithData ( t * transaction . Transaction , data interface { } , mp * mempool . Pool , feer mempool . Feer , verificationFunction func ( t * transaction . Transaction , data interface { } ) error ) error
RegisterPostBlock ( f func ( func ( * transaction . Transaction , * mempool . Pool , bool ) bool , * mempool . Pool , * block . Block ) )
SubscribeForBlocks ( ch chan <- * block . Block )
UnsubscribeFromBlocks ( ch chan <- * block . Block )
}
2022-01-12 01:11:21 +00:00
// Service is a service abstraction (oracle, state root, consensus, etc).
Service interface {
Start ( )
Shutdown ( )
}
2018-03-14 09:36:59 +00:00
// Server represents the local Node in the network. Its transport could
// be of any kind.
Server struct {
2018-03-15 20:45:37 +00:00
// ServerConfig holds the Server configuration.
ServerConfig
2018-02-06 06:43:32 +00:00
2018-03-23 20:36:59 +00:00
// id also known as the nonce of the server.
2018-03-14 09:36:59 +00:00
id uint32
2018-01-26 18:04:13 +00:00
2020-06-18 09:00:51 +00:00
// Network's magic number for correct message decoding.
network netmode . Magic
2020-11-17 12:57:50 +00:00
// stateRootInHeader specifies if block header contain state root.
stateRootInHeader bool
2020-06-18 09:00:51 +00:00
2020-11-27 10:55:48 +00:00
transport Transporter
discovery Discoverer
2022-01-14 01:09:54 +00:00
chain Ledger
2020-11-27 10:55:48 +00:00
bQueue * blockQueue
2021-07-30 13:57:42 +00:00
bSyncQueue * blockQueue
2021-08-03 19:28:16 +00:00
mempool * mempool . Pool
2020-11-27 10:55:48 +00:00
notaryRequestPool * mempool . Pool
2021-01-18 12:52:51 +00:00
extensiblePool * extpool . Pool
2020-12-30 08:01:13 +00:00
notaryFeer NotaryFeer
2022-01-12 01:11:21 +00:00
services [ ] Service
2022-01-12 18:09:37 +00:00
extensHandlers map [ string ] func ( * payload . Extensible ) error
2022-01-12 20:04:07 +00:00
extensHighPrio string
txCallback func ( * transaction . Transaction )
2018-01-26 18:04:13 +00:00
2021-08-03 19:43:31 +00:00
txInLock sync . Mutex
txInMap map [ util . Uint256 ] struct { }
2018-03-14 09:36:59 +00:00
lock sync . RWMutex
peers map [ Peer ] bool
2018-01-26 20:39:34 +00:00
2021-09-27 13:13:37 +00:00
// lastRequestedBlock contains a height of the last requested block.
lastRequestedBlock atomic . Uint32
// lastRequestedHeader contains a height of the last requested header.
lastRequestedHeader atomic . Uint32
2020-11-26 15:53:10 +00:00
2018-03-14 09:36:59 +00:00
register chan Peer
unregister chan peerDrop
quit chan struct { }
2019-11-15 10:32:40 +00:00
2020-02-18 15:11:55 +00:00
transactions chan * transaction . Transaction
2021-04-02 09:55:56 +00:00
syncReached * atomic . Bool
2019-12-30 07:43:05 +00:00
2022-01-12 21:20:03 +00:00
stateSync StateSync
2020-09-28 11:58:04 +00:00
2019-12-30 07:43:05 +00:00
log * zap . Logger
2018-03-14 09:36:59 +00:00
}
2018-03-10 12:04:06 +00:00
2018-03-14 09:36:59 +00:00
peerDrop struct {
peer Peer
reason error
2018-02-01 08:00:42 +00:00
}
2018-03-14 09:36:59 +00:00
)
2019-11-15 17:42:23 +00:00
func randomID ( ) uint32 {
buf := make ( [ ] byte , 4 )
_ , _ = rand . Read ( buf )
return binary . BigEndian . Uint32 ( buf )
}
2018-03-14 09:36:59 +00:00
// NewServer returns a new Server, initialized with the given configuration.
2022-01-14 01:09:54 +00:00
func NewServer ( config ServerConfig , chain Ledger , stSync StateSync , log * zap . Logger ) ( * Server , error ) {
2022-01-12 21:20:03 +00:00
return newServerFromConstructors ( config , chain , stSync , log , func ( s * Server ) Transporter {
2020-12-07 09:52:19 +00:00
return NewTCPTransport ( s , net . JoinHostPort ( s . ServerConfig . Address , strconv . Itoa ( int ( s . ServerConfig . Port ) ) ) , s . log )
2022-01-12 20:04:07 +00:00
} , newDefaultDiscovery )
2020-12-07 09:52:19 +00:00
}
2022-01-14 01:09:54 +00:00
func newServerFromConstructors ( config ServerConfig , chain Ledger , stSync StateSync , log * zap . Logger ,
2020-12-07 09:52:19 +00:00
newTransport func ( * Server ) Transporter ,
newDiscovery func ( [ ] string , time . Duration , Transporter ) Discoverer ,
) ( * Server , error ) {
2019-12-30 07:43:05 +00:00
if log == nil {
2020-01-22 08:17:51 +00:00
return nil , errors . New ( "logger is a required parameter" )
2019-12-30 07:43:05 +00:00
}
2021-05-04 14:54:16 +00:00
if config . ExtensiblePoolSize <= 0 {
config . ExtensiblePoolSize = defaultExtensiblePoolSize
log . Info ( "ExtensiblePoolSize is not set or wrong, using default value" ,
zap . Int ( "ExtensiblePoolSize" , config . ExtensiblePoolSize ) )
}
2018-03-09 15:55:25 +00:00
s := & Server {
2020-11-17 12:57:50 +00:00
ServerConfig : config ,
chain : chain ,
id : randomID ( ) ,
network : chain . GetConfig ( ) . Magic ,
stateRootInHeader : chain . GetConfig ( ) . StateRootInHeader ,
quit : make ( chan struct { } ) ,
register : make ( chan Peer ) ,
unregister : make ( chan peerDrop ) ,
2021-08-03 19:43:31 +00:00
txInMap : make ( map [ util . Uint256 ] struct { } ) ,
2020-11-17 12:57:50 +00:00
peers : make ( map [ Peer ] bool ) ,
2021-04-02 09:55:56 +00:00
syncReached : atomic . NewBool ( false ) ,
2021-08-03 19:28:16 +00:00
mempool : chain . GetMemPool ( ) ,
2021-05-04 14:54:16 +00:00
extensiblePool : extpool . New ( chain , config . ExtensiblePoolSize ) ,
2020-11-17 12:57:50 +00:00
log : log ,
transactions : make ( chan * transaction . Transaction , 64 ) ,
2022-01-12 18:09:37 +00:00
extensHandlers : make ( map [ string ] func ( * payload . Extensible ) error ) ,
2022-01-12 21:20:03 +00:00
stateSync : stSync ,
2018-01-26 18:04:13 +00:00
}
2020-11-27 10:55:48 +00:00
if chain . P2PSigExtensionsEnabled ( ) {
2020-12-30 08:01:13 +00:00
s . notaryFeer = NewNotaryFeer ( chain )
2021-05-28 11:55:06 +00:00
s . notaryRequestPool = mempool . New ( chain . GetConfig ( ) . P2PNotaryRequestPayloadPoolSize , 1 , true )
2022-01-14 01:09:54 +00:00
chain . RegisterPostBlock ( func ( isRelevant func ( * transaction . Transaction , * mempool . Pool , bool ) bool , txpool * mempool . Pool , _ * block . Block ) {
2020-11-27 10:55:48 +00:00
s . notaryRequestPool . RemoveStale ( func ( t * transaction . Transaction ) bool {
2022-01-14 01:09:54 +00:00
return isRelevant ( t , txpool , true )
2020-12-30 08:01:13 +00:00
} , s . notaryFeer )
2020-11-27 10:55:48 +00:00
} )
}
2020-02-14 17:46:05 +00:00
s . bQueue = newBlockQueue ( maxBlockBatch , chain , log , func ( b * block . Block ) {
2021-06-04 17:29:47 +00:00
s . tryStartServices ( )
2020-02-14 17:46:05 +00:00
} )
2018-01-26 18:04:13 +00:00
2022-01-12 21:20:03 +00:00
s . bSyncQueue = newBlockQueue ( maxBlockBatch , s . stateSync , log , nil )
2021-07-30 13:57:42 +00:00
2020-01-13 12:22:21 +00:00
if s . MinPeers < 0 {
2019-12-30 07:43:05 +00:00
s . log . Info ( "bad MinPeers configured, using the default value" ,
zap . Int ( "configured" , s . MinPeers ) ,
zap . Int ( "actual" , defaultMinPeers ) )
2019-11-01 10:29:54 +00:00
s . MinPeers = defaultMinPeers
}
2019-11-06 12:17:20 +00:00
if s . MaxPeers <= 0 {
2019-12-30 07:43:05 +00:00
s . log . Info ( "bad MaxPeers configured, using the default value" ,
zap . Int ( "configured" , s . MaxPeers ) ,
zap . Int ( "actual" , defaultMaxPeers ) )
2019-11-06 12:17:20 +00:00
s . MaxPeers = defaultMaxPeers
}
if s . AttemptConnPeers <= 0 {
2019-12-30 07:43:05 +00:00
s . log . Info ( "bad AttemptConnPeers configured, using the default value" ,
zap . Int ( "configured" , s . AttemptConnPeers ) ,
zap . Int ( "actual" , defaultAttemptConnPeers ) )
2019-11-06 12:17:20 +00:00
s . AttemptConnPeers = defaultAttemptConnPeers
}
2020-12-07 09:52:19 +00:00
s . transport = newTransport ( s )
s . discovery = newDiscovery (
2020-10-13 13:30:10 +00:00
s . Seeds ,
2018-03-14 09:36:59 +00:00
s . DialTimeout ,
s . transport ,
)
2018-01-26 18:04:13 +00:00
2020-01-22 08:17:51 +00:00
return s , nil
2018-01-30 10:56:36 +00:00
}
2018-03-23 20:36:59 +00:00
// ID returns the servers ID.
func ( s * Server ) ID ( ) uint32 {
return s . id
}
2018-03-14 09:36:59 +00:00
// Start will start the server and its underlying transport.
2018-03-23 20:36:59 +00:00
func ( s * Server ) Start ( errChan chan error ) {
2019-12-30 07:43:05 +00:00
s . log . Info ( "node started" ,
zap . Uint32 ( "blockHeight" , s . chain . BlockHeight ( ) ) ,
zap . Uint32 ( "headerHeight" , s . chain . HeaderHeight ( ) ) )
2018-03-17 11:53:21 +00:00
2021-04-02 09:55:56 +00:00
s . tryStartServices ( )
2020-11-27 10:55:48 +00:00
s . initStaleMemPools ( )
2020-01-13 12:22:21 +00:00
2020-02-18 15:11:55 +00:00
go s . broadcastTxLoop ( )
2020-05-07 20:00:38 +00:00
go s . relayBlocksLoop ( )
2019-09-25 16:54:31 +00:00
go s . bQueue . run ( )
2021-07-30 13:57:42 +00:00
go s . bSyncQueue . run ( )
2018-03-14 09:36:59 +00:00
go s . transport . Accept ( )
2019-10-29 17:51:17 +00:00
setServerAndNodeVersions ( s . UserAgent , strconv . FormatUint ( uint64 ( s . id ) , 10 ) )
2018-03-14 09:36:59 +00:00
s . run ( )
2018-01-31 19:11:08 +00:00
}
2018-01-30 10:56:36 +00:00
2018-03-23 20:36:59 +00:00
// Shutdown disconnects all peers and stops listening.
func ( s * Server ) Shutdown ( ) {
2019-12-30 07:43:05 +00:00
s . log . Info ( "shutting down server" , zap . Int ( "peers" , s . PeerCount ( ) ) )
2020-02-24 12:54:18 +00:00
s . transport . Close ( )
2020-02-24 12:39:31 +00:00
s . discovery . Close ( )
2021-08-06 12:04:13 +00:00
for _ , p := range s . getPeers ( nil ) {
2020-02-24 12:54:18 +00:00
p . Disconnect ( errServerShutdown )
}
2019-09-25 16:54:31 +00:00
s . bQueue . discard ( )
2021-07-30 13:57:42 +00:00
s . bSyncQueue . discard ( )
2022-01-12 01:11:21 +00:00
for _ , svc := range s . services {
svc . Shutdown ( )
2021-05-28 11:55:06 +00:00
}
if s . chain . P2PSigExtensionsEnabled ( ) {
2021-01-15 12:40:15 +00:00
s . notaryRequestPool . StopSubscriptions ( )
}
2018-03-23 20:36:59 +00:00
close ( s . quit )
}
2022-01-12 02:01:34 +00:00
// AddService allows to add a service to be started/stopped by Server.
func ( s * Server ) AddService ( svc Service ) {
s . services = append ( s . services , svc )
2020-09-28 11:58:04 +00:00
}
2022-01-12 18:09:37 +00:00
// AddExtensibleService register a service that handles extensible payload of some kind.
func ( s * Server ) AddExtensibleService ( svc Service , category string , handler func ( * payload . Extensible ) error ) {
s . extensHandlers [ category ] = handler
s . AddService ( svc )
2021-02-01 16:00:07 +00:00
}
2022-01-12 20:04:07 +00:00
// AddExtensibleHPService registers a high-priority service that handles extensible payload of some kind.
func ( s * Server ) AddExtensibleHPService ( svc Service , category string , handler func ( * payload . Extensible ) error , txCallback func ( * transaction . Transaction ) ) {
s . txCallback = txCallback
s . extensHighPrio = category
s . AddExtensibleService ( svc , category , handler )
}
2022-01-12 20:21:09 +00:00
// GetNotaryPool allows to retrieve notary pool, if it's configured.
func ( s * Server ) GetNotaryPool ( ) * mempool . Pool {
return s . notaryRequestPool
}
2018-04-09 16:58:09 +00:00
// UnconnectedPeers returns a list of peers that are in the discovery peer list
// but are not connected to the server.
2018-03-23 20:36:59 +00:00
func ( s * Server ) UnconnectedPeers ( ) [ ] string {
2020-01-10 12:16:14 +00:00
return s . discovery . UnconnectedPeers ( )
2018-03-23 20:36:59 +00:00
}
2018-04-09 16:58:09 +00:00
// BadPeers returns a list of peers the are flagged as "bad" peers.
2018-03-23 20:36:59 +00:00
func ( s * Server ) BadPeers ( ) [ ] string {
2020-01-10 12:16:14 +00:00
return s . discovery . BadPeers ( )
2018-03-23 20:36:59 +00:00
}
2020-01-10 12:13:29 +00:00
// ConnectedPeers returns a list of currently connected peers.
func ( s * Server ) ConnectedPeers ( ) [ ] string {
s . lock . RLock ( )
defer s . lock . RUnlock ( )
peers := make ( [ ] string , 0 , len ( s . peers ) )
for k := range s . peers {
peers = append ( peers , k . PeerAddr ( ) . String ( ) )
}
return peers
}
2020-01-27 09:44:05 +00:00
// run is a goroutine that starts another goroutine to manage protocol specifics
// while itself dealing with peers management (handling connects/disconnects).
2018-03-14 09:36:59 +00:00
func ( s * Server ) run ( ) {
2020-01-27 09:44:05 +00:00
go s . runProto ( )
2018-03-09 15:55:25 +00:00
for {
2019-11-06 12:17:20 +00:00
if s . PeerCount ( ) < s . MinPeers {
s . discovery . RequestRemote ( s . AttemptConnPeers )
2019-09-12 13:19:18 +00:00
}
2019-09-13 09:03:07 +00:00
if s . discovery . PoolCount ( ) < minPoolCount {
2020-05-21 10:35:44 +00:00
s . broadcastHPMessage ( NewMessage ( CMDGetAddr , payload . NewNullPayload ( ) ) )
2019-09-13 09:03:07 +00:00
}
2018-03-14 09:36:59 +00:00
select {
case <- s . quit :
return
case p := <- s . register :
2019-11-06 09:38:47 +00:00
s . lock . Lock ( )
2018-03-14 09:36:59 +00:00
s . peers [ p ] = true
2019-11-06 09:38:47 +00:00
s . lock . Unlock ( )
2019-11-06 12:17:20 +00:00
peerCount := s . PeerCount ( )
2020-01-28 16:39:12 +00:00
s . log . Info ( "new peer connected" , zap . Stringer ( "addr" , p . RemoteAddr ( ) ) , zap . Int ( "peerCount" , peerCount ) )
2019-11-06 12:17:20 +00:00
if peerCount > s . MaxPeers {
s . lock . RLock ( )
// Pick a random peer and drop connection to it.
for peer := range s . peers {
2020-02-24 09:39:46 +00:00
// It will send us unregister signal.
go peer . Disconnect ( errMaxPeers )
2019-11-06 12:17:20 +00:00
break
}
s . lock . RUnlock ( )
}
2019-10-29 17:51:17 +00:00
updatePeersConnectedMetric ( s . PeerCount ( ) )
2018-03-14 09:36:59 +00:00
case drop := <- s . unregister :
2019-11-06 09:38:47 +00:00
s . lock . Lock ( )
2019-09-13 12:36:53 +00:00
if s . peers [ drop . peer ] {
delete ( s . peers , drop . peer )
2019-11-06 09:38:47 +00:00
s . lock . Unlock ( )
2019-12-30 07:43:05 +00:00
s . log . Warn ( "peer disconnected" ,
zap . Stringer ( "addr" , drop . peer . RemoteAddr ( ) ) ,
2021-11-01 09:19:00 +00:00
zap . Error ( drop . reason ) ,
2019-12-30 07:43:05 +00:00
zap . Int ( "peerCount" , s . PeerCount ( ) ) )
2019-11-06 07:55:21 +00:00
addr := drop . peer . PeerAddr ( ) . String ( )
2019-11-27 08:56:56 +00:00
if drop . reason == errIdenticalID {
s . discovery . RegisterBadAddr ( addr )
2020-12-23 13:13:57 +00:00
} else if drop . reason == errAlreadyConnected {
// There is a race condition when peer can be disconnected twice for the this reason
// which can lead to no connections to peer at all. Here we check for such a possibility.
stillConnected := false
s . lock . RLock ( )
verDrop := drop . peer . Version ( )
addr := drop . peer . PeerAddr ( ) . String ( )
if verDrop != nil {
for peer := range s . peers {
ver := peer . Version ( )
// Already connected, drop this connection.
if ver != nil && ver . Nonce == verDrop . Nonce && peer . PeerAddr ( ) . String ( ) == addr {
stillConnected = true
}
}
}
s . lock . RUnlock ( )
if ! stillConnected {
s . discovery . UnregisterConnectedAddr ( addr )
s . discovery . BackFill ( addr )
}
} else {
2019-11-27 08:56:56 +00:00
s . discovery . UnregisterConnectedAddr ( addr )
s . discovery . BackFill ( addr )
}
2019-10-29 17:51:17 +00:00
updatePeersConnectedMetric ( s . PeerCount ( ) )
2019-11-06 09:38:47 +00:00
} else {
// else the peer is already gone, which can happen
// because we have two goroutines sending signals here
s . lock . Unlock ( )
2019-09-13 12:36:53 +00:00
}
2018-03-09 15:55:25 +00:00
}
2018-01-31 19:11:08 +00:00
}
2018-01-27 12:39:07 +00:00
}
2020-01-27 09:44:05 +00:00
// runProto is a goroutine that manages server-wide protocol events.
func ( s * Server ) runProto ( ) {
pingTimer := time . NewTimer ( s . PingInterval )
for {
prevHeight := s . chain . BlockHeight ( )
select {
case <- s . quit :
return
case <- pingTimer . C :
if s . chain . BlockHeight ( ) == prevHeight {
// Get a copy of s.peers to avoid holding a lock while sending.
2021-08-06 12:04:13 +00:00
for _ , peer := range s . getPeers ( nil ) {
2021-08-06 08:26:19 +00:00
_ = peer . SendPing ( NewMessage ( CMDPing , payload . NewPing ( s . chain . BlockHeight ( ) , s . id ) ) )
2020-01-27 09:44:05 +00:00
}
}
pingTimer . Reset ( s . PingInterval )
}
}
}
2021-04-02 09:55:56 +00:00
func ( s * Server ) tryStartServices ( ) {
if s . syncReached . Load ( ) {
2019-11-15 10:32:40 +00:00
return
}
2021-04-02 09:55:56 +00:00
if s . IsInSync ( ) && s . syncReached . CAS ( false , true ) {
s . log . Info ( "node reached synchronized state, starting services" )
2021-05-28 11:55:06 +00:00
if s . chain . P2PSigExtensionsEnabled ( ) {
s . notaryRequestPool . RunSubscriptions ( ) // WSClient is also a subscriber.
}
2022-01-12 01:11:21 +00:00
for _ , svc := range s . services {
svc . Start ( )
2021-04-02 10:12:06 +00:00
}
2019-11-15 10:32:40 +00:00
}
}
2021-05-28 11:55:06 +00:00
// SubscribeForNotaryRequests adds given channel to a notary request event
// broadcasting, so when a new P2PNotaryRequest is received or an existing
// P2PNotaryRequest is removed from pool you'll receive it via this channel.
// Make sure it's read from regularly as not reading these events might affect
// other Server functions.
// Ensure that P2PSigExtensions are enabled before calling this method.
func ( s * Server ) SubscribeForNotaryRequests ( ch chan <- mempoolevent . Event ) {
if ! s . chain . P2PSigExtensionsEnabled ( ) {
panic ( "P2PSigExtensions are disabled" )
}
s . notaryRequestPool . SubscribeForTransactions ( ch )
}
// UnsubscribeFromNotaryRequests unsubscribes given channel from notary request
// notifications, you can close it afterwards. Passing non-subscribed channel
// is a no-op.
// Ensure that P2PSigExtensions are enabled before calling this method.
func ( s * Server ) UnsubscribeFromNotaryRequests ( ch chan <- mempoolevent . Event ) {
if ! s . chain . P2PSigExtensionsEnabled ( ) {
panic ( "P2PSigExtensions are disabled" )
}
s . notaryRequestPool . UnsubscribeFromTransactions ( ch )
}
2021-08-06 12:04:13 +00:00
// getPeers returns current list of peers connected to the server filtered by
// isOK function if it's given.
func ( s * Server ) getPeers ( isOK func ( Peer ) bool ) [ ] Peer {
2019-11-15 10:32:40 +00:00
s . lock . RLock ( )
defer s . lock . RUnlock ( )
2021-08-05 20:59:53 +00:00
peers := make ( [ ] Peer , 0 , len ( s . peers ) )
for k := range s . peers {
2021-08-06 12:04:13 +00:00
if isOK != nil && ! isOK ( k ) {
continue
}
2021-08-05 20:59:53 +00:00
peers = append ( peers , k )
2019-11-15 10:32:40 +00:00
}
return peers
2018-03-23 20:36:59 +00:00
}
2018-03-14 09:36:59 +00:00
// PeerCount returns the number of current connected peers.
func ( s * Server ) PeerCount ( ) int {
s . lock . RLock ( )
defer s . lock . RUnlock ( )
return len ( s . peers )
2018-02-01 20:28:45 +00:00
}
2019-12-02 07:51:45 +00:00
// HandshakedPeersCount returns the number of connected peers
// which have already performed handshake.
func ( s * Server ) HandshakedPeersCount ( ) int {
s . lock . RLock ( )
defer s . lock . RUnlock ( )
var count int
for p := range s . peers {
if p . Handshaked ( ) {
count ++
}
}
return count
}
2018-02-01 20:28:45 +00:00
2020-01-21 14:26:08 +00:00
// getVersionMsg returns current version message.
2020-05-22 09:17:17 +00:00
func ( s * Server ) getVersionMsg ( ) ( * Message , error ) {
2020-06-10 07:01:21 +00:00
port , err := s . Port ( )
2020-05-22 09:17:17 +00:00
if err != nil {
2020-06-10 07:01:21 +00:00
return nil , err
2020-05-22 09:17:17 +00:00
}
capabilities := [ ] capability . Capability {
{
Type : capability . TCPServer ,
Data : & capability . Server {
Port : port ,
} ,
} ,
}
if s . Relay {
capabilities = append ( capabilities , capability . Capability {
Type : capability . FullNode ,
Data : & capability . Node {
StartHeight : s . chain . BlockHeight ( ) ,
} ,
} )
}
2018-03-14 09:36:59 +00:00
payload := payload . NewVersion (
2020-05-21 10:35:44 +00:00
s . Net ,
2018-03-14 09:36:59 +00:00
s . id ,
s . UserAgent ,
2020-05-22 09:17:17 +00:00
capabilities ,
2018-03-09 15:55:25 +00:00
)
2020-05-22 09:17:17 +00:00
return NewMessage ( CMDVersion , payload ) , nil
2018-03-14 09:36:59 +00:00
}
2018-03-09 15:55:25 +00:00
2020-02-14 17:46:05 +00:00
// IsInSync answers the question of whether the server is in sync with the
// network or not (at least how the server itself sees it). The server operates
// with the data that it has, the number of peers (that has to be more than
// minimum number) and height of these peers (our chain has to be not lower
// than 2/3 of our peers have). Ideally we would check for the highest of the
// peers, but the problem is that they can lie to us and send whatever height
// they want to.
func ( s * Server ) IsInSync ( ) bool {
var peersNumber int
var notHigher int
2021-07-30 13:57:42 +00:00
if s . stateSync . IsActive ( ) {
return false
}
2020-02-14 17:46:05 +00:00
if s . MinPeers == 0 {
return true
}
ourLastBlock := s . chain . BlockHeight ( )
s . lock . RLock ( )
for p := range s . peers {
if p . Handshaked ( ) {
peersNumber ++
if ourLastBlock >= p . LastBlockIndex ( ) {
notHigher ++
}
}
}
s . lock . RUnlock ( )
// Checking bQueue would also be nice, but it can be filled with garbage
// easily at the moment.
return peersNumber >= s . MinPeers && ( 3 * notHigher > 2 * peersNumber ) // && s.bQueue.length() == 0
}
2018-03-14 09:36:59 +00:00
// When a peer sends out his version we reply with verack after validating
// the version.
func ( s * Server ) handleVersionCmd ( p Peer , version * payload . Version ) error {
2019-09-13 12:43:22 +00:00
err := p . HandleVersion ( version )
if err != nil {
return err
2018-03-14 09:36:59 +00:00
}
if s . id == version . Nonce {
return errIdenticalID
2018-01-28 13:59:32 +00:00
}
2020-05-21 10:35:44 +00:00
// Make sure both server and peer are operating on
// the same network.
if s . Net != version . Magic {
return errInvalidNetwork
}
2019-11-06 09:39:17 +00:00
peerAddr := p . PeerAddr ( ) . String ( )
2020-01-28 16:10:13 +00:00
s . discovery . RegisterConnectedAddr ( peerAddr )
2019-11-06 09:39:17 +00:00
s . lock . RLock ( )
for peer := range s . peers {
2020-01-28 10:54:09 +00:00
if p == peer {
continue
}
ver := peer . Version ( )
2019-11-06 09:39:17 +00:00
// Already connected, drop this connection.
2020-01-28 10:54:09 +00:00
if ver != nil && ver . Nonce == version . Nonce && peer . PeerAddr ( ) . String ( ) == peerAddr {
2019-11-06 09:39:17 +00:00
s . lock . RUnlock ( )
return errAlreadyConnected
}
}
s . lock . RUnlock ( )
2020-12-23 12:32:16 +00:00
return p . SendVersionAck ( NewMessage ( CMDVerack , payload . NewNullPayload ( ) ) )
2018-01-28 13:59:32 +00:00
}
2018-03-14 09:36:59 +00:00
// handleBlockCmd processes the received block received from its peer.
2020-01-14 12:32:07 +00:00
func ( s * Server ) handleBlockCmd ( p Peer , block * block . Block ) error {
2021-07-30 13:57:42 +00:00
if s . stateSync . IsActive ( ) {
return s . bSyncQueue . putBlock ( block )
}
2019-09-25 16:54:31 +00:00
return s . bQueue . putBlock ( block )
2018-03-14 09:36:59 +00:00
}
2018-02-01 08:00:42 +00:00
2020-01-17 10:17:19 +00:00
// handlePing processes ping request.
func ( s * Server ) handlePing ( p Peer , ping * payload . Ping ) error {
2020-08-14 13:22:15 +00:00
err := p . HandlePing ( ping )
if err != nil {
return err
}
2021-07-30 13:57:42 +00:00
err = s . requestBlocksOrHeaders ( p )
if err != nil {
return err
2020-08-14 13:22:15 +00:00
}
2020-05-21 10:35:44 +00:00
return p . EnqueueP2PMessage ( NewMessage ( CMDPong , payload . NewPing ( s . chain . BlockHeight ( ) , s . id ) ) )
2020-01-17 10:17:19 +00:00
}
2021-07-30 13:57:42 +00:00
func ( s * Server ) requestBlocksOrHeaders ( p Peer ) error {
if s . stateSync . NeedHeaders ( ) {
if s . chain . HeaderHeight ( ) < p . LastBlockIndex ( ) {
return s . requestHeaders ( p )
}
return nil
}
2021-08-13 09:46:23 +00:00
var (
2022-01-14 01:09:54 +00:00
bq Blockqueuer = s . chain
2021-08-13 09:46:23 +00:00
requestMPTNodes bool
)
2021-07-30 13:57:42 +00:00
if s . stateSync . IsActive ( ) {
bq = s . stateSync
2021-08-13 09:46:23 +00:00
requestMPTNodes = s . stateSync . NeedMPTNodes ( )
}
if bq . BlockHeight ( ) >= p . LastBlockIndex ( ) {
return nil
}
err := s . requestBlocks ( bq , p )
if err != nil {
return err
2021-07-30 13:57:42 +00:00
}
2021-08-13 09:46:23 +00:00
if requestMPTNodes {
return s . requestMPTNodes ( p , s . stateSync . GetUnknownMPTNodesBatch ( payload . MaxMPTHashesCount ) )
2021-07-30 13:57:42 +00:00
}
return nil
}
// requestHeaders sends a CMDGetHeaders message to the peer to sync up in headers.
func ( s * Server ) requestHeaders ( p Peer ) error {
2021-09-27 13:13:37 +00:00
pl := getRequestBlocksPayload ( p , s . chain . HeaderHeight ( ) , & s . lastRequestedHeader )
return p . EnqueueP2PMessage ( NewMessage ( CMDGetHeaders , pl ) )
2021-07-30 13:57:42 +00:00
}
2020-01-17 10:17:19 +00:00
// handlePing processes pong request.
func ( s * Server ) handlePong ( p Peer , pong * payload . Ping ) error {
2020-01-20 16:02:19 +00:00
err := p . HandlePong ( pong )
if err != nil {
return err
2020-01-17 10:17:19 +00:00
}
2021-07-30 13:57:42 +00:00
return s . requestBlocksOrHeaders ( p )
2020-01-17 10:17:19 +00:00
}
2019-10-22 14:56:03 +00:00
// handleInvCmd processes the received inventory.
2018-03-14 09:36:59 +00:00
func ( s * Server ) handleInvCmd ( p Peer , inv * payload . Inventory ) error {
2019-12-02 08:02:52 +00:00
reqHashes := make ( [ ] util . Uint256 , 0 )
var typExists = map [ payload . InventoryType ] func ( util . Uint256 ) bool {
2021-08-03 19:28:16 +00:00
payload . TXType : s . mempool . ContainsKey ,
2019-12-02 08:02:52 +00:00
payload . BlockType : s . chain . HasBlock ,
2021-01-14 13:38:40 +00:00
payload . ExtensibleType : func ( h util . Uint256 ) bool {
2021-01-18 12:52:51 +00:00
cp := s . extensiblePool . Get ( h )
2019-12-02 08:02:52 +00:00
return cp != nil
} ,
2020-11-27 10:55:48 +00:00
payload . P2PNotaryRequestType : func ( h util . Uint256 ) bool {
return s . notaryRequestPool . ContainsKey ( h )
} ,
2019-12-02 08:02:52 +00:00
}
if exists := typExists [ inv . Type ] ; exists != nil {
for _ , hash := range inv . Hashes {
if ! exists ( hash ) {
reqHashes = append ( reqHashes , hash )
}
}
}
if len ( reqHashes ) > 0 {
2020-05-21 10:35:44 +00:00
msg := NewMessage ( CMDGetData , payload . NewInventory ( inv . Type , reqHashes ) )
2020-01-16 18:16:31 +00:00
pkt , err := msg . Bytes ( )
if err != nil {
return err
}
2021-01-14 13:38:40 +00:00
if inv . Type == payload . ExtensibleType {
2020-12-22 12:55:55 +00:00
return p . EnqueueHPPacket ( true , pkt )
2020-01-16 18:16:31 +00:00
}
2020-01-23 16:40:40 +00:00
return p . EnqueueP2PPacket ( pkt )
2019-12-02 08:02:52 +00:00
}
return nil
2018-03-09 15:55:25 +00:00
}
2018-02-01 08:00:42 +00:00
2020-06-19 12:03:40 +00:00
// handleMempoolCmd handles getmempool command.
func ( s * Server ) handleMempoolCmd ( p Peer ) error {
2021-08-03 19:28:16 +00:00
txs := s . mempool . GetVerifiedTransactions ( )
2020-06-19 12:03:40 +00:00
hs := make ( [ ] util . Uint256 , 0 , payload . MaxHashesCount )
for i := range txs {
hs = append ( hs , txs [ i ] . Hash ( ) )
if len ( hs ) < payload . MaxHashesCount && i != len ( txs ) - 1 {
continue
}
msg := NewMessage ( CMDInv , payload . NewInventory ( payload . TXType , hs ) )
err := p . EnqueueP2PMessage ( msg )
if err != nil {
return err
}
hs = hs [ : 0 ]
}
return nil
}
2019-10-24 07:18:30 +00:00
// handleInvCmd processes the received inventory.
func ( s * Server ) handleGetDataCmd ( p Peer , inv * payload . Inventory ) error {
2020-07-08 12:25:58 +00:00
var notFound [ ] util . Uint256
2020-01-16 18:16:31 +00:00
for _ , hash := range inv . Hashes {
var msg * Message
switch inv . Type {
case payload . TXType :
2019-10-24 07:18:30 +00:00
tx , _ , err := s . chain . GetTransaction ( hash )
if err == nil {
2020-05-21 10:35:44 +00:00
msg = NewMessage ( CMDTX , tx )
2020-07-08 12:25:58 +00:00
} else {
notFound = append ( notFound , hash )
2019-10-24 07:18:30 +00:00
}
2020-01-16 18:16:31 +00:00
case payload . BlockType :
2019-10-24 07:18:30 +00:00
b , err := s . chain . GetBlock ( hash )
if err == nil {
2020-05-21 10:35:44 +00:00
msg = NewMessage ( CMDBlock , b )
2020-07-08 12:25:58 +00:00
} else {
notFound = append ( notFound , hash )
2019-10-24 07:18:30 +00:00
}
2021-01-14 13:38:40 +00:00
case payload . ExtensibleType :
2021-01-18 12:52:51 +00:00
if cp := s . extensiblePool . Get ( hash ) ; cp != nil {
2021-01-14 13:38:40 +00:00
msg = NewMessage ( CMDExtensible , cp )
2020-01-16 18:16:31 +00:00
}
2020-11-27 10:55:48 +00:00
case payload . P2PNotaryRequestType :
if nrp , ok := s . notaryRequestPool . TryGetData ( hash ) ; ok { // already have checked P2PSigExtEnabled
msg = NewMessage ( CMDP2PNotaryRequest , nrp . ( * payload . P2PNotaryRequest ) )
} else {
notFound = append ( notFound , hash )
}
2020-01-16 18:16:31 +00:00
}
if msg != nil {
pkt , err := msg . Bytes ( )
2020-01-27 08:55:03 +00:00
if err == nil {
2021-01-14 13:38:40 +00:00
if inv . Type == payload . ExtensibleType {
2020-12-22 12:55:55 +00:00
err = p . EnqueueHPPacket ( true , pkt )
2020-01-27 08:55:03 +00:00
} else {
2020-01-23 16:40:40 +00:00
err = p . EnqueueP2PPacket ( pkt )
2020-01-27 08:55:03 +00:00
}
}
2020-01-16 18:16:31 +00:00
if err != nil {
return err
}
2019-11-08 15:40:21 +00:00
}
2019-10-24 07:18:30 +00:00
}
2020-07-08 12:25:58 +00:00
if len ( notFound ) != 0 {
return p . EnqueueP2PMessage ( NewMessage ( CMDNotFound , payload . NewInventory ( inv . Type , notFound ) ) )
}
2019-10-24 07:18:30 +00:00
return nil
}
2021-07-30 13:57:42 +00:00
// handleGetMPTDataCmd processes the received MPT inventory.
func ( s * Server ) handleGetMPTDataCmd ( p Peer , inv * payload . MPTInventory ) error {
if ! s . chain . GetConfig ( ) . P2PStateExchangeExtensions {
return errors . New ( "GetMPTDataCMD was received, but P2PStateExchangeExtensions are disabled" )
}
if s . chain . GetConfig ( ) . KeepOnlyLatestState {
// TODO: implement keeping MPT states for P1 and P2 height (#2095, #2152 related)
return errors . New ( "GetMPTDataCMD was received, but only latest MPT state is supported" )
}
resp := payload . MPTData { }
capLeft := payload . MaxSize - 8 // max(io.GetVarSize(len(resp.Nodes)))
2021-09-06 12:16:47 +00:00
added := make ( map [ util . Uint256 ] struct { } )
2021-07-30 13:57:42 +00:00
for _ , h := range inv . Hashes {
if capLeft <= 2 { // at least 1 byte for len(nodeBytes) and 1 byte for node type
break
}
err := s . stateSync . Traverse ( h ,
2021-09-06 12:16:47 +00:00
func ( n mpt . Node , node [ ] byte ) bool {
if _ , ok := added [ n . Hash ( ) ] ; ok {
return false
}
2021-07-30 13:57:42 +00:00
l := len ( node )
size := l + io . GetVarSize ( l )
if size > capLeft {
return true
}
resp . Nodes = append ( resp . Nodes , node )
2021-09-06 12:16:47 +00:00
added [ n . Hash ( ) ] = struct { } { }
2021-07-30 13:57:42 +00:00
capLeft -= size
return false
} )
if err != nil {
return fmt . Errorf ( "failed to traverse MPT starting from %s: %w" , h . StringBE ( ) , err )
}
}
if len ( resp . Nodes ) > 0 {
msg := NewMessage ( CMDMPTData , & resp )
return p . EnqueueP2PMessage ( msg )
}
return nil
}
func ( s * Server ) handleMPTDataCmd ( p Peer , data * payload . MPTData ) error {
if ! s . chain . GetConfig ( ) . P2PStateExchangeExtensions {
return errors . New ( "MPTDataCMD was received, but P2PStateExchangeExtensions are disabled" )
}
return s . stateSync . AddMPTNodes ( data . Nodes )
}
2021-08-13 09:46:23 +00:00
// requestMPTNodes requests specified MPT nodes from the peer or broadcasts
// request if peer is not specified.
func ( s * Server ) requestMPTNodes ( p Peer , itms [ ] util . Uint256 ) error {
if len ( itms ) == 0 {
return nil
}
if len ( itms ) > payload . MaxMPTHashesCount {
itms = itms [ : payload . MaxMPTHashesCount ]
}
pl := payload . NewMPTInventory ( itms )
msg := NewMessage ( CMDGetMPTData , pl )
return p . EnqueueP2PMessage ( msg )
}
2019-12-25 16:40:18 +00:00
// handleGetBlocksCmd processes the getblocks request.
func ( s * Server ) handleGetBlocksCmd ( p Peer , gb * payload . GetBlocks ) error {
2020-05-22 14:30:56 +00:00
count := gb . Count
if gb . Count < 0 || gb . Count > payload . MaxHashesCount {
count = payload . MaxHashesCount
2019-12-25 16:40:18 +00:00
}
2020-05-22 14:30:56 +00:00
start , err := s . chain . GetHeader ( gb . HashStart )
2019-12-25 16:40:18 +00:00
if err != nil {
return err
}
blockHashes := make ( [ ] util . Uint256 , 0 )
2020-12-07 15:40:04 +00:00
for i := start . Index + 1 ; i <= start . Index + uint32 ( count ) ; i ++ {
2019-12-25 16:40:18 +00:00
hash := s . chain . GetHeaderHash ( int ( i ) )
2020-05-22 14:30:56 +00:00
if hash . Equals ( util . Uint256 { } ) {
2019-12-25 16:40:18 +00:00
break
}
blockHashes = append ( blockHashes , hash )
}
if len ( blockHashes ) == 0 {
return nil
}
payload := payload . NewInventory ( payload . BlockType , blockHashes )
2020-05-21 10:35:44 +00:00
msg := NewMessage ( CMDInv , payload )
2020-01-23 16:40:40 +00:00
return p . EnqueueP2PMessage ( msg )
2019-12-25 16:40:18 +00:00
}
2020-07-31 10:58:22 +00:00
// handleGetBlockByIndexCmd processes the getblockbyindex request.
func ( s * Server ) handleGetBlockByIndexCmd ( p Peer , gbd * payload . GetBlockByIndex ) error {
2020-07-31 11:17:14 +00:00
count := gbd . Count
if gbd . Count < 0 || gbd . Count > payload . MaxHashesCount {
count = payload . MaxHashesCount
}
for i := gbd . IndexStart ; i < gbd . IndexStart + uint32 ( count ) ; i ++ {
2020-07-31 11:51:51 +00:00
hash := s . chain . GetHeaderHash ( int ( i ) )
if hash . Equals ( util . Uint256 { } ) {
break
}
b , err := s . chain . GetBlock ( hash )
2020-05-22 12:43:46 +00:00
if err != nil {
2020-07-31 11:51:51 +00:00
break
2020-05-22 12:43:46 +00:00
}
msg := NewMessage ( CMDBlock , b )
2020-07-31 11:51:51 +00:00
if err = p . EnqueueP2PMessage ( msg ) ; err != nil {
return err
}
2020-05-22 12:43:46 +00:00
}
return nil
}
2019-11-29 08:08:22 +00:00
// handleGetHeadersCmd processes the getheaders request.
2020-07-31 11:47:42 +00:00
func ( s * Server ) handleGetHeadersCmd ( p Peer , gh * payload . GetBlockByIndex ) error {
if gh . IndexStart > s . chain . HeaderHeight ( ) {
return nil
2019-11-29 08:08:22 +00:00
}
2020-07-31 11:47:42 +00:00
count := gh . Count
if gh . Count < 0 || gh . Count > payload . MaxHeadersAllowed {
count = payload . MaxHeadersAllowed
2019-11-29 08:08:22 +00:00
}
resp := payload . Headers { }
2020-07-31 11:47:42 +00:00
resp . Hdrs = make ( [ ] * block . Header , 0 , count )
for i := gh . IndexStart ; i < gh . IndexStart + uint32 ( count ) ; i ++ {
2019-11-29 08:08:22 +00:00
hash := s . chain . GetHeaderHash ( int ( i ) )
2020-05-22 14:30:56 +00:00
if hash . Equals ( util . Uint256 { } ) {
2019-11-29 08:08:22 +00:00
break
}
header , err := s . chain . GetHeader ( hash )
if err != nil {
break
}
resp . Hdrs = append ( resp . Hdrs , header )
}
if len ( resp . Hdrs ) == 0 {
return nil
}
2020-05-21 10:35:44 +00:00
msg := NewMessage ( CMDHeaders , & resp )
2020-01-23 16:40:40 +00:00
return p . EnqueueP2PMessage ( msg )
2019-11-29 08:08:22 +00:00
}
2021-07-30 13:57:42 +00:00
// handleHeadersCmd processes headers payload.
func ( s * Server ) handleHeadersCmd ( p Peer , h * payload . Headers ) error {
return s . stateSync . AddHeaders ( h . Hdrs ... )
}
2021-01-14 13:38:40 +00:00
// handleExtensibleCmd processes received extensible payload.
func ( s * Server ) handleExtensibleCmd ( e * payload . Extensible ) error {
2021-04-02 09:55:56 +00:00
if ! s . syncReached . Load ( ) {
return nil
2021-02-05 11:54:43 +00:00
}
2021-01-18 12:52:51 +00:00
ok , err := s . extensiblePool . Add ( e )
if err != nil {
2021-01-14 13:38:40 +00:00
return err
}
2021-01-18 12:52:51 +00:00
if ! ok { // payload is already in cache
return nil
2021-01-14 13:38:40 +00:00
}
2022-01-12 18:09:37 +00:00
handler := s . extensHandlers [ e . Category ]
if handler != nil {
err = handler ( e )
2021-02-01 16:00:07 +00:00
if err != nil {
return err
}
2021-01-14 13:38:40 +00:00
}
2022-01-12 18:09:37 +00:00
s . advertiseExtensible ( e )
return nil
}
2021-01-18 12:52:51 +00:00
2022-01-12 18:09:37 +00:00
func ( s * Server ) advertiseExtensible ( e * payload . Extensible ) {
2021-01-18 12:52:51 +00:00
msg := NewMessage ( CMDInv , payload . NewInventory ( payload . ExtensibleType , [ ] util . Uint256 { e . Hash ( ) } ) )
2022-01-12 20:04:07 +00:00
if e . Category == s . extensHighPrio {
2022-01-12 18:09:37 +00:00
// It's high priority because it directly affects consensus process,
// even though it's just an inv.
2021-01-18 12:52:51 +00:00
s . broadcastHPMessage ( msg )
} else {
s . broadcastMessage ( msg )
}
2019-11-08 15:40:21 +00:00
}
2019-11-15 10:32:40 +00:00
// handleTxCmd processes received transaction.
// It never returns an error.
func ( s * Server ) handleTxCmd ( tx * transaction . Transaction ) error {
2019-11-29 08:09:54 +00:00
// It's OK for it to fail for various reasons like tx already existing
// in the pool.
2021-08-03 19:43:31 +00:00
s . txInLock . Lock ( )
_ , ok := s . txInMap [ tx . Hash ( ) ]
if ok || s . mempool . ContainsKey ( tx . Hash ( ) ) {
s . txInLock . Unlock ( )
return nil
}
s . txInMap [ tx . Hash ( ) ] = struct { } { }
s . txInLock . Unlock ( )
2022-01-14 16:51:04 +00:00
if s . txCallback != nil {
s . txCallback ( tx )
}
2021-02-17 11:51:54 +00:00
if s . verifyAndPoolTX ( tx ) == nil {
2020-11-27 10:55:48 +00:00
s . broadcastTX ( tx , nil )
}
2021-08-03 19:43:31 +00:00
s . txInLock . Lock ( )
delete ( s . txInMap , tx . Hash ( ) )
s . txInLock . Unlock ( )
2020-11-27 10:55:48 +00:00
return nil
}
// handleP2PNotaryRequestCmd process received P2PNotaryRequest payload.
func ( s * Server ) handleP2PNotaryRequestCmd ( r * payload . P2PNotaryRequest ) error {
if ! s . chain . P2PSigExtensionsEnabled ( ) {
return errors . New ( "P2PNotaryRequestCMD was received, but P2PSignatureExtensions are disabled" )
}
2021-02-17 11:51:54 +00:00
// It's OK for it to fail for various reasons like request already existing
// in the pool.
2021-05-12 17:14:35 +00:00
_ = s . RelayP2PNotaryRequest ( r )
2021-02-08 08:48:28 +00:00
return nil
}
// RelayP2PNotaryRequest adds given request to the pool and relays. It does not check
// P2PSigExtensions enabled.
2021-02-17 11:51:54 +00:00
func ( s * Server ) RelayP2PNotaryRequest ( r * payload . P2PNotaryRequest ) error {
err := s . verifyAndPoolNotaryRequest ( r )
if err == nil {
2020-11-27 10:55:48 +00:00
s . broadcastP2PNotaryRequestPayload ( nil , r )
}
2021-02-17 11:51:54 +00:00
return err
2020-11-27 10:55:48 +00:00
}
// verifyAndPoolNotaryRequest verifies NotaryRequest payload and adds it to the payload mempool.
2021-02-17 11:51:54 +00:00
func ( s * Server ) verifyAndPoolNotaryRequest ( r * payload . P2PNotaryRequest ) error {
2022-01-14 01:09:54 +00:00
return s . chain . PoolTxWithData ( r . FallbackTransaction , r , s . notaryRequestPool , s . notaryFeer , s . verifyNotaryRequest )
2020-11-27 10:55:48 +00:00
}
// verifyNotaryRequest is a function for state-dependant P2PNotaryRequest payload verification which is executed before ordinary blockchain's verification.
2022-01-14 01:09:54 +00:00
func ( s * Server ) verifyNotaryRequest ( _ * transaction . Transaction , data interface { } ) error {
2020-11-27 10:55:48 +00:00
r := data . ( * payload . P2PNotaryRequest )
payer := r . FallbackTransaction . Signers [ 1 ] . Account
2022-01-14 01:09:54 +00:00
if _ , err := s . chain . VerifyWitness ( payer , r , & r . Witness , s . chain . GetMaxVerificationGAS ( ) ) ; err != nil {
2020-11-27 10:55:48 +00:00
return fmt . Errorf ( "bad P2PNotaryRequest payload witness: %w" , err )
}
2022-01-14 01:09:54 +00:00
notaryHash := s . chain . GetNotaryContractScriptHash ( )
2020-12-30 08:01:13 +00:00
if r . FallbackTransaction . Sender ( ) != notaryHash {
2020-11-27 10:55:48 +00:00
return errors . New ( "P2PNotary contract should be a sender of the fallback transaction" )
}
2022-01-14 01:09:54 +00:00
depositExpiration := s . chain . GetNotaryDepositExpiration ( payer )
2020-11-27 10:55:48 +00:00
if r . FallbackTransaction . ValidUntilBlock >= depositExpiration {
return fmt . Errorf ( "fallback transaction is valid after deposit is unlocked: ValidUntilBlock is %d, deposit lock expires at %d" , r . FallbackTransaction . ValidUntilBlock , depositExpiration )
2020-01-29 08:56:40 +00:00
}
2019-11-15 10:32:40 +00:00
return nil
}
2020-11-27 10:55:48 +00:00
func ( s * Server ) broadcastP2PNotaryRequestPayload ( _ * transaction . Transaction , data interface { } ) {
2021-02-08 15:58:10 +00:00
r := data . ( * payload . P2PNotaryRequest ) // we can guarantee that cast is successful
2020-11-27 10:55:48 +00:00
msg := NewMessage ( CMDInv , payload . NewInventory ( payload . P2PNotaryRequestType , [ ] util . Uint256 { r . FallbackTransaction . Hash ( ) } ) )
s . broadcastMessage ( msg )
}
2019-09-13 09:03:07 +00:00
// handleAddrCmd will process received addresses.
func ( s * Server ) handleAddrCmd ( p Peer , addrs * payload . AddressList ) error {
2020-11-25 10:34:38 +00:00
if ! p . CanProcessAddr ( ) {
return errors . New ( "unexpected addr received" )
}
2021-03-26 09:31:07 +00:00
dups := make ( map [ string ] bool )
2019-09-13 09:03:07 +00:00
for _ , a := range addrs . Addrs {
2020-05-22 09:59:18 +00:00
addr , err := a . GetTCPAddress ( )
2021-03-26 09:31:07 +00:00
if err == nil && ! dups [ addr ] {
dups [ addr ] = true
2020-05-22 09:59:18 +00:00
s . discovery . BackFill ( addr )
}
2019-09-13 09:03:07 +00:00
}
return nil
}
2019-09-13 17:38:34 +00:00
// handleGetAddrCmd sends to the peer some good addresses that we know of.
func ( s * Server ) handleGetAddrCmd ( p Peer ) error {
addrs := s . discovery . GoodPeers ( )
2020-10-07 20:29:20 +00:00
if len ( addrs ) > payload . MaxAddrsCount {
addrs = addrs [ : payload . MaxAddrsCount ]
2019-09-13 17:38:34 +00:00
}
alist := payload . NewAddressList ( len ( addrs ) )
ts := time . Now ( )
for i , addr := range addrs {
// we know it's a good address, so it can't fail
2020-05-22 09:59:18 +00:00
netaddr , _ := net . ResolveTCPAddr ( "tcp" , addr . Address )
alist . Addrs [ i ] = payload . NewAddressAndTime ( netaddr , ts , addr . Capabilities )
2019-09-13 17:38:34 +00:00
}
2020-05-21 10:35:44 +00:00
return p . EnqueueP2PMessage ( NewMessage ( CMDAddr , alist ) )
2019-09-13 17:38:34 +00:00
}
2020-07-31 14:12:13 +00:00
// requestBlocks sends a CMDGetBlockByIndex message to the peer
2018-03-14 09:36:59 +00:00
// to sync up in blocks. A maximum of maxBlockBatch will
2020-11-26 15:53:10 +00:00
// send at once. Two things we need to take care of:
// 1. If possible, blocks should be fetched in parallel.
// height..+500 to one peer, height+500..+1000 to another etc.
// 2. Every block must eventually be fetched even if peer sends no answer.
// Thus the following algorithm is used:
// 1. Block range is divided into chunks of payload.MaxHashesCount.
// 2. Send requests for chunk in increasing order.
// 3. After all requests were sent, request random height.
2022-01-14 01:09:54 +00:00
func ( s * Server ) requestBlocks ( bq Blockqueuer , p Peer ) error {
2022-01-17 21:04:41 +00:00
h := bq . BlockHeight ( )
pl := getRequestBlocksPayload ( p , h , & s . lastRequestedBlock )
lq := s . bQueue . lastQueued ( )
if lq > pl . IndexStart {
c := int16 ( h + blockCacheSize - lq )
if c < payload . MaxHashesCount {
pl . Count = c
}
pl . IndexStart = lq + 1
}
2021-09-27 13:13:37 +00:00
return p . EnqueueP2PMessage ( NewMessage ( CMDGetBlockByIndex , pl ) )
}
func getRequestBlocksPayload ( p Peer , currHeight uint32 , lastRequestedHeight * atomic . Uint32 ) * payload . GetBlockByIndex {
2020-11-26 15:53:10 +00:00
var peerHeight = p . LastBlockIndex ( )
var needHeight uint32
2021-09-27 13:13:37 +00:00
// lastRequestedBlock can only be increased.
2020-11-26 15:53:10 +00:00
for {
2021-09-27 13:13:37 +00:00
old := lastRequestedHeight . Load ( )
2020-11-26 15:53:10 +00:00
if old <= currHeight {
needHeight = currHeight + 1
2021-09-27 13:13:37 +00:00
if ! lastRequestedHeight . CAS ( old , needHeight ) {
2020-11-26 15:53:10 +00:00
continue
}
} else if old < currHeight + ( blockCacheSize - payload . MaxHashesCount ) {
needHeight = currHeight + 1
if peerHeight > old + payload . MaxHashesCount {
needHeight = old + payload . MaxHashesCount
2021-09-27 13:13:37 +00:00
if ! lastRequestedHeight . CAS ( old , needHeight ) {
2020-11-26 15:53:10 +00:00
continue
}
}
} else {
index := mrand . Intn ( blockCacheSize / payload . MaxHashesCount )
needHeight = currHeight + 1 + uint32 ( index * payload . MaxHashesCount )
}
break
}
2021-09-27 13:13:37 +00:00
return payload . NewGetBlockByIndex ( needHeight , - 1 )
2018-02-01 08:00:42 +00:00
}
2019-10-22 14:56:03 +00:00
// handleMessage processes the given message.
2018-04-13 10:14:08 +00:00
func ( s * Server ) handleMessage ( peer Peer , msg * Message ) error {
2020-01-28 13:40:38 +00:00
s . log . Debug ( "got msg" ,
zap . Stringer ( "addr" , peer . RemoteAddr ( ) ) ,
2020-05-19 11:54:51 +00:00
zap . String ( "type" , msg . Command . String ( ) ) )
2020-01-28 13:40:38 +00:00
2019-09-13 12:43:22 +00:00
if peer . Handshaked ( ) {
2019-10-24 10:10:10 +00:00
if inv , ok := msg . Payload . ( * payload . Inventory ) ; ok {
2020-11-27 10:55:48 +00:00
if ! inv . Type . Valid ( s . chain . P2PSigExtensionsEnabled ( ) ) || len ( inv . Hashes ) == 0 {
2019-10-24 10:10:10 +00:00
return errInvalidInvType
}
}
2020-05-19 11:54:51 +00:00
switch msg . Command {
2019-09-13 12:43:22 +00:00
case CMDAddr :
addrs := msg . Payload . ( * payload . AddressList )
return s . handleAddrCmd ( peer , addrs )
2019-09-13 17:38:34 +00:00
case CMDGetAddr :
// it has no payload
return s . handleGetAddrCmd ( peer )
2019-12-25 16:40:18 +00:00
case CMDGetBlocks :
gb := msg . Payload . ( * payload . GetBlocks )
return s . handleGetBlocksCmd ( peer , gb )
2020-07-31 10:58:22 +00:00
case CMDGetBlockByIndex :
gbd := msg . Payload . ( * payload . GetBlockByIndex )
return s . handleGetBlockByIndexCmd ( peer , gbd )
2019-10-24 07:18:30 +00:00
case CMDGetData :
inv := msg . Payload . ( * payload . Inventory )
return s . handleGetDataCmd ( peer , inv )
2021-07-30 13:57:42 +00:00
case CMDGetMPTData :
inv := msg . Payload . ( * payload . MPTInventory )
return s . handleGetMPTDataCmd ( peer , inv )
case CMDMPTData :
inv := msg . Payload . ( * payload . MPTData )
return s . handleMPTDataCmd ( peer , inv )
2019-11-29 08:08:22 +00:00
case CMDGetHeaders :
2020-07-31 11:47:42 +00:00
gh := msg . Payload . ( * payload . GetBlockByIndex )
2019-11-29 08:08:22 +00:00
return s . handleGetHeadersCmd ( peer , gh )
2021-07-30 13:57:42 +00:00
case CMDHeaders :
h := msg . Payload . ( * payload . Headers )
return s . handleHeadersCmd ( peer , h )
2019-09-13 12:43:22 +00:00
case CMDInv :
inventory := msg . Payload . ( * payload . Inventory )
return s . handleInvCmd ( peer , inventory )
2020-06-19 12:03:40 +00:00
case CMDMempool :
// no payload
return s . handleMempoolCmd ( peer )
2019-09-13 12:43:22 +00:00
case CMDBlock :
2020-01-14 12:32:07 +00:00
block := msg . Payload . ( * block . Block )
2019-09-13 12:43:22 +00:00
return s . handleBlockCmd ( peer , block )
2021-01-14 13:38:40 +00:00
case CMDExtensible :
cp := msg . Payload . ( * payload . Extensible )
return s . handleExtensibleCmd ( cp )
2019-11-15 10:32:40 +00:00
case CMDTX :
tx := msg . Payload . ( * transaction . Transaction )
return s . handleTxCmd ( tx )
2020-11-27 10:55:48 +00:00
case CMDP2PNotaryRequest :
r := msg . Payload . ( * payload . P2PNotaryRequest )
return s . handleP2PNotaryRequestCmd ( r )
2020-01-17 10:17:19 +00:00
case CMDPing :
ping := msg . Payload . ( * payload . Ping )
return s . handlePing ( peer , ping )
case CMDPong :
pong := msg . Payload . ( * payload . Ping )
return s . handlePong ( peer , pong )
2019-09-13 12:43:22 +00:00
case CMDVersion , CMDVerack :
2020-05-19 11:54:51 +00:00
return fmt . Errorf ( "received '%s' after the handshake" , msg . Command . String ( ) )
2019-09-13 12:43:22 +00:00
}
} else {
2020-05-19 11:54:51 +00:00
switch msg . Command {
2019-09-13 12:43:22 +00:00
case CMDVersion :
version := msg . Payload . ( * payload . Version )
return s . handleVersionCmd ( peer , version )
case CMDVerack :
err := peer . HandleVersionAck ( )
if err != nil {
return err
}
2020-01-15 14:03:42 +00:00
go peer . StartProtocol ( )
2019-11-15 10:32:40 +00:00
2021-07-30 13:57:42 +00:00
s . tryInitStateSync ( )
2021-04-02 09:55:56 +00:00
s . tryStartServices ( )
2019-09-13 12:43:22 +00:00
default :
2020-05-19 11:54:51 +00:00
return fmt . Errorf ( "received '%s' during handshake" , msg . Command . String ( ) )
2018-03-14 09:36:59 +00:00
}
}
return nil
2018-01-26 18:04:13 +00:00
}
Implement rpc server method: sendrawtransaction (#174)
* Added new config attributes: 'SecondsPerBlock','LowPriorityThreshold'
* Added new files:
* Added new method: CompareTo
* Fixed empty Slice case
* Added new methods: LessThan, GreaterThan, Equal, CompareTo
* Added new method: InputIntersection
* Added MaxTransactionSize, GroupOutputByAssetID
* Added ned method: ScriptHash
* Added new method: IsDoubleSpend
* Refactor blockchainer, Added Feer interface, Verify and GetMemPool method
* 1) Added MemPool
2) Added new methods to satisfy the blockchainer interface: IsLowPriority, Verify, GetMemPool
* Added new methods: RelayTxn, RelayDirectly
* Fixed tests
* Implemented RPC server method sendrawtransaction
* Refactor getrawtransaction, sendrawtransaction in separate methods
* Moved 'secondsPerBlock' to config file
* Implemented Kim suggestions:
1) Fixed data race issues
2) refactor Verify method
3) Get rid of unused InputIntersection method due to refactoring Verify method
4) Fixed bug in https://github.com/CityOfZion/neo-go/pull/174#discussion_r264108135
5) minor simplications of the code
* Fixed minor issues related to
1) space
2) getter methods do not need pointer on the receiver
3) error message
4) refactoring CompareTo method in uint256.go
* Fixed small issues
* Use sync.RWMutex instead of sync.Mutex
* Refined (R)Lock/(R)Unlock
* return error instead of bool in Verify methods
2019-03-20 12:30:05 +00:00
2021-07-30 13:57:42 +00:00
func ( s * Server ) tryInitStateSync ( ) {
if ! s . stateSync . IsActive ( ) {
s . bSyncQueue . discard ( )
return
}
if s . stateSync . IsInitialized ( ) {
return
}
var peersNumber int
s . lock . RLock ( )
heights := make ( [ ] uint32 , 0 )
for p := range s . peers {
if p . Handshaked ( ) {
peersNumber ++
peerLastBlock := p . LastBlockIndex ( )
i := sort . Search ( len ( heights ) , func ( i int ) bool {
return heights [ i ] >= peerLastBlock
} )
heights = append ( heights , peerLastBlock )
if i != len ( heights ) - 1 {
copy ( heights [ i + 1 : ] , heights [ i : ] )
heights [ i ] = peerLastBlock
}
}
}
s . lock . RUnlock ( )
if peersNumber >= s . MinPeers && len ( heights ) > 0 {
// choose the height of the median peer as current chain's height
h := heights [ len ( heights ) / 2 ]
err := s . stateSync . Init ( h )
if err != nil {
s . log . Fatal ( "failed to init state sync module" ,
zap . Uint32 ( "evaluated chain's blockHeight" , h ) ,
zap . Uint32 ( "blockHeight" , s . chain . BlockHeight ( ) ) ,
zap . Uint32 ( "headerHeight" , s . chain . HeaderHeight ( ) ) ,
zap . Error ( err ) )
}
// module can be inactive after init (i.e. full state is collected and ordinary block processing is needed)
if ! s . stateSync . IsActive ( ) {
s . bSyncQueue . discard ( )
}
}
}
2022-01-12 18:09:37 +00:00
// BroadcastExtensible add locally-generated Extensible payload to the pool
// and advertises it to peers.
func ( s * Server ) BroadcastExtensible ( p * payload . Extensible ) {
2021-01-18 12:52:51 +00:00
_ , err := s . extensiblePool . Add ( p )
if err != nil {
s . log . Error ( "created payload is not valid" , zap . Error ( err ) )
return
}
2022-01-12 18:09:37 +00:00
s . advertiseExtensible ( p )
2019-11-15 10:32:40 +00:00
}
2022-01-12 20:04:07 +00:00
// RequestTx asks for given transactions from Server peers using GetData message.
func ( s * Server ) RequestTx ( hashes ... util . Uint256 ) {
2019-11-15 10:32:40 +00:00
if len ( hashes ) == 0 {
return
}
2020-12-07 15:40:36 +00:00
for i := 0 ; i <= len ( hashes ) / payload . MaxHashesCount ; i ++ {
2020-09-04 13:04:40 +00:00
start := i * payload . MaxHashesCount
stop := ( i + 1 ) * payload . MaxHashesCount
2020-12-07 15:40:36 +00:00
if stop > len ( hashes ) {
2020-09-04 13:04:40 +00:00
stop = len ( hashes )
}
2020-12-07 15:40:36 +00:00
if start == stop {
break
}
2020-09-04 13:04:40 +00:00
msg := NewMessage ( CMDGetData , payload . NewInventory ( payload . TXType , hashes [ start : stop ] ) )
// It's high priority because it directly affects consensus process,
// even though it's getdata.
s . broadcastHPMessage ( msg )
}
2019-11-15 10:32:40 +00:00
}
2020-01-22 08:01:13 +00:00
// iteratePeersWithSendMsg sends given message to all peers using two functions
// passed, one is to send the message and the other is to filtrate peers (the
// peer is considered invalid if it returns false).
2020-12-22 12:55:55 +00:00
func ( s * Server ) iteratePeersWithSendMsg ( msg * Message , send func ( Peer , bool , [ ] byte ) error , peerOK func ( Peer ) bool ) {
2021-08-06 12:25:41 +00:00
var deadN , peerN , sentN int
2020-08-31 16:07:28 +00:00
// Get a copy of s.peers to avoid holding a lock while sending.
2021-08-06 12:04:13 +00:00
peers := s . getPeers ( peerOK )
2021-08-06 12:25:41 +00:00
peerN = len ( peers )
if peerN == 0 {
2020-08-31 16:07:28 +00:00
return
}
2021-08-06 12:25:41 +00:00
mrand . Shuffle ( peerN , func ( i , j int ) {
2021-08-05 21:03:46 +00:00
peers [ i ] , peers [ j ] = peers [ j ] , peers [ i ]
} )
2020-01-22 08:01:13 +00:00
pkt , err := msg . Bytes ( )
if err != nil {
return
}
2020-12-22 12:55:55 +00:00
2021-08-06 12:25:41 +00:00
// If true, this node isn't counted any more, either it's dead or we
// have already sent an Inv to it.
finished := make ( [ ] bool , peerN )
network: merge two loops in iteratePeersWithSendMsg, send to 2/3
Refactor code and be fine with sending to just 2/3 of proper peers. Previously
it was an edge case, but it can be a normal thing to do also as broadcasting
to everyone is obviously too expensive and excessive (hi, #608).
Baseline (four node, 10 workers):
RPS 8180.760 8137.822 7858.358 7820.011 8051.076 ≈ 8010 ± 2.04%
TPS 7819.831 7521.172 7519.023 7242.965 7426.000 ≈ 7506 ± 2.78%
CPU % 41.983 38.775 40.606 39.375 35.537 ≈ 39.3 ± 6.15%
Mem MB 2947.189 2743.658 2896.688 2813.276 2863.108 ≈ 2853 ± 2.74%
Patched:
RPS 9714.567 9676.102 9358.609 9371.408 9301.372 ≈ 9484 ± 2.05% ↑ 18.40%
TPS 8809.796 8796.854 8534.754 8661.158 8426.162 ≈ 8646 ± 1.92% ↑ 15.19%
CPU % 44.980 45.018 33.640 29.645 43.830 ≈ 39.4 ± 18.41% ↑ 0.25%
Mem MB 2989.078 2976.577 2306.185 2351.929 2910.479 ≈ 2707 ± 12.80% ↓ 5.12%
There is a nuance with this patch however. While typically it works the way
outlined above, sometimes it works like this:
RPS ≈ 6734.368
TPS ≈ 6299.332
CPU ≈ 25.552%
Mem ≈ 2706.046MB
And that's because the log looks like this:
DeltaTime, TransactionsCount, TPS
5014, 44212, 8817.710
5163, 49690, 9624.249
5166, 49523, 9586.334
5189, 49693, 9576.604
5198, 49339, 9491.920
5147, 49559, 9628.716
5192, 49680, 9568.567
5163, 49750, 9635.871
5183, 49189, 9490.450
5159, 49653, 9624.540
5167, 47945, 9279.079
5179, 2051, 396.022
5015, 4, 0.798
5004, 0, 0.000
5003, 0, 0.000
5003, 0, 0.000
5003, 0, 0.000
5003, 0, 0.000
5004, 0, 0.000
5003, 2925, 584.649
5040, 49099, 9741.865
5161, 49718, 9633.404
5170, 49228, 9521.857
5179, 49773, 9610.543
5167, 47253, 9145.152
5202, 49788, 9570.934
5177, 47704, 9214.603
5209, 46610, 8947.975
5249, 49156, 9364.831
5163, 18284, 3541.352
5072, 174, 34.306
On a network with 4 CNs and 1 RPC node there is 1/256 probability that a block
won't be broadcasted to RPC node, so it won't see it until ping timeout kicks
in. While it doesn't see a block it can't accept new incoming transactions so
the bench gets stuck basically. To me that's an acceptable trade-off because
normal networks are much larger than that and the effect of this patch is way
more important there, but still that's what we have and we need to take into
account.
2021-08-06 12:36:46 +00:00
// Try non-blocking sends first and only block if have to.
for _ , blocking := range [ ] bool { false , true } {
for i , peer := range peers {
// Send to 2/3 of good peers.
if 3 * sentN >= 2 * ( peerN - deadN ) {
return
2021-08-06 12:25:41 +00:00
}
network: merge two loops in iteratePeersWithSendMsg, send to 2/3
Refactor code and be fine with sending to just 2/3 of proper peers. Previously
it was an edge case, but it can be a normal thing to do also as broadcasting
to everyone is obviously too expensive and excessive (hi, #608).
Baseline (four node, 10 workers):
RPS 8180.760 8137.822 7858.358 7820.011 8051.076 ≈ 8010 ± 2.04%
TPS 7819.831 7521.172 7519.023 7242.965 7426.000 ≈ 7506 ± 2.78%
CPU % 41.983 38.775 40.606 39.375 35.537 ≈ 39.3 ± 6.15%
Mem MB 2947.189 2743.658 2896.688 2813.276 2863.108 ≈ 2853 ± 2.74%
Patched:
RPS 9714.567 9676.102 9358.609 9371.408 9301.372 ≈ 9484 ± 2.05% ↑ 18.40%
TPS 8809.796 8796.854 8534.754 8661.158 8426.162 ≈ 8646 ± 1.92% ↑ 15.19%
CPU % 44.980 45.018 33.640 29.645 43.830 ≈ 39.4 ± 18.41% ↑ 0.25%
Mem MB 2989.078 2976.577 2306.185 2351.929 2910.479 ≈ 2707 ± 12.80% ↓ 5.12%
There is a nuance with this patch however. While typically it works the way
outlined above, sometimes it works like this:
RPS ≈ 6734.368
TPS ≈ 6299.332
CPU ≈ 25.552%
Mem ≈ 2706.046MB
And that's because the log looks like this:
DeltaTime, TransactionsCount, TPS
5014, 44212, 8817.710
5163, 49690, 9624.249
5166, 49523, 9586.334
5189, 49693, 9576.604
5198, 49339, 9491.920
5147, 49559, 9628.716
5192, 49680, 9568.567
5163, 49750, 9635.871
5183, 49189, 9490.450
5159, 49653, 9624.540
5167, 47945, 9279.079
5179, 2051, 396.022
5015, 4, 0.798
5004, 0, 0.000
5003, 0, 0.000
5003, 0, 0.000
5003, 0, 0.000
5003, 0, 0.000
5004, 0, 0.000
5003, 2925, 584.649
5040, 49099, 9741.865
5161, 49718, 9633.404
5170, 49228, 9521.857
5179, 49773, 9610.543
5167, 47253, 9145.152
5202, 49788, 9570.934
5177, 47704, 9214.603
5209, 46610, 8947.975
5249, 49156, 9364.831
5163, 18284, 3541.352
5072, 174, 34.306
On a network with 4 CNs and 1 RPC node there is 1/256 probability that a block
won't be broadcasted to RPC node, so it won't see it until ping timeout kicks
in. While it doesn't see a block it can't accept new incoming transactions so
the bench gets stuck basically. To me that's an acceptable trade-off because
normal networks are much larger than that and the effect of this patch is way
more important there, but still that's what we have and we need to take into
account.
2021-08-06 12:36:46 +00:00
if finished [ i ] {
continue
}
err := send ( peer , blocking , pkt )
switch err {
case nil :
if msg . Command == CMDGetAddr {
peer . AddGetAddrSent ( )
}
sentN ++
case errBusy : // Can be retried.
continue
default :
2021-08-06 12:25:41 +00:00
deadN ++
}
network: merge two loops in iteratePeersWithSendMsg, send to 2/3
Refactor code and be fine with sending to just 2/3 of proper peers. Previously
it was an edge case, but it can be a normal thing to do also as broadcasting
to everyone is obviously too expensive and excessive (hi, #608).
Baseline (four node, 10 workers):
RPS 8180.760 8137.822 7858.358 7820.011 8051.076 ≈ 8010 ± 2.04%
TPS 7819.831 7521.172 7519.023 7242.965 7426.000 ≈ 7506 ± 2.78%
CPU % 41.983 38.775 40.606 39.375 35.537 ≈ 39.3 ± 6.15%
Mem MB 2947.189 2743.658 2896.688 2813.276 2863.108 ≈ 2853 ± 2.74%
Patched:
RPS 9714.567 9676.102 9358.609 9371.408 9301.372 ≈ 9484 ± 2.05% ↑ 18.40%
TPS 8809.796 8796.854 8534.754 8661.158 8426.162 ≈ 8646 ± 1.92% ↑ 15.19%
CPU % 44.980 45.018 33.640 29.645 43.830 ≈ 39.4 ± 18.41% ↑ 0.25%
Mem MB 2989.078 2976.577 2306.185 2351.929 2910.479 ≈ 2707 ± 12.80% ↓ 5.12%
There is a nuance with this patch however. While typically it works the way
outlined above, sometimes it works like this:
RPS ≈ 6734.368
TPS ≈ 6299.332
CPU ≈ 25.552%
Mem ≈ 2706.046MB
And that's because the log looks like this:
DeltaTime, TransactionsCount, TPS
5014, 44212, 8817.710
5163, 49690, 9624.249
5166, 49523, 9586.334
5189, 49693, 9576.604
5198, 49339, 9491.920
5147, 49559, 9628.716
5192, 49680, 9568.567
5163, 49750, 9635.871
5183, 49189, 9490.450
5159, 49653, 9624.540
5167, 47945, 9279.079
5179, 2051, 396.022
5015, 4, 0.798
5004, 0, 0.000
5003, 0, 0.000
5003, 0, 0.000
5003, 0, 0.000
5003, 0, 0.000
5004, 0, 0.000
5003, 2925, 584.649
5040, 49099, 9741.865
5161, 49718, 9633.404
5170, 49228, 9521.857
5179, 49773, 9610.543
5167, 47253, 9145.152
5202, 49788, 9570.934
5177, 47704, 9214.603
5209, 46610, 8947.975
5249, 49156, 9364.831
5163, 18284, 3541.352
5072, 174, 34.306
On a network with 4 CNs and 1 RPC node there is 1/256 probability that a block
won't be broadcasted to RPC node, so it won't see it until ping timeout kicks
in. While it doesn't see a block it can't accept new incoming transactions so
the bench gets stuck basically. To me that's an acceptable trade-off because
normal networks are much larger than that and the effect of this patch is way
more important there, but still that's what we have and we need to take into
account.
2021-08-06 12:36:46 +00:00
finished [ i ] = true
2020-12-22 12:55:55 +00:00
}
2019-11-15 10:32:40 +00:00
}
}
2020-01-22 08:01:13 +00:00
// broadcastMessage sends the message to all available peers.
func ( s * Server ) broadcastMessage ( msg * Message ) {
s . iteratePeersWithSendMsg ( msg , Peer . EnqueuePacket , nil )
}
// broadcastHPMessage sends the high-priority message to all available peers.
func ( s * Server ) broadcastHPMessage ( msg * Message ) {
s . iteratePeersWithSendMsg ( msg , Peer . EnqueueHPPacket , nil )
}
2020-05-07 20:00:38 +00:00
// relayBlocksLoop subscribes to new blocks in the ledger and broadcasts them
// to the network. Intended to be run as a separate goroutine.
func ( s * Server ) relayBlocksLoop ( ) {
ch := make ( chan * block . Block , 2 ) // Some buffering to smooth out possible egressing delays.
s . chain . SubscribeForBlocks ( ch )
for {
select {
case <- s . quit :
s . chain . UnsubscribeFromBlocks ( ch )
return
case b := <- ch :
msg := NewMessage ( CMDInv , payload . NewInventory ( payload . BlockType , [ ] util . Uint256 { b . Hash ( ) } ) )
// Filter out nodes that are more current (avoid spamming the network
// during initial sync).
s . iteratePeersWithSendMsg ( msg , Peer . EnqueuePacket , func ( p Peer ) bool {
return p . Handshaked ( ) && p . LastBlockIndex ( ) < b . Index
} )
2021-01-18 12:52:51 +00:00
s . extensiblePool . RemoveStale ( b . Index )
2020-05-07 20:00:38 +00:00
}
}
2019-11-29 09:27:15 +00:00
}
2020-01-29 08:56:40 +00:00
// verifyAndPoolTX verifies the TX and adds it to the local mempool.
2021-02-17 11:51:54 +00:00
func ( s * Server ) verifyAndPoolTX ( t * transaction . Transaction ) error {
return s . chain . PoolTx ( t )
2020-01-29 08:56:40 +00:00
}
// RelayTxn a new transaction to the local node and the connected peers.
// Reference: the method OnRelay in C#: https://github.com/neo-project/neo/blob/master/neo/Network/P2P/LocalNode.cs#L159
2021-02-17 11:51:54 +00:00
func ( s * Server ) RelayTxn ( t * transaction . Transaction ) error {
err := s . verifyAndPoolTX ( t )
if err == nil {
2020-11-27 10:55:48 +00:00
s . broadcastTX ( t , nil )
2020-01-29 08:56:40 +00:00
}
2021-02-17 11:51:54 +00:00
return err
2020-01-29 08:56:40 +00:00
}
Implement rpc server method: sendrawtransaction (#174)
* Added new config attributes: 'SecondsPerBlock','LowPriorityThreshold'
* Added new files:
* Added new method: CompareTo
* Fixed empty Slice case
* Added new methods: LessThan, GreaterThan, Equal, CompareTo
* Added new method: InputIntersection
* Added MaxTransactionSize, GroupOutputByAssetID
* Added ned method: ScriptHash
* Added new method: IsDoubleSpend
* Refactor blockchainer, Added Feer interface, Verify and GetMemPool method
* 1) Added MemPool
2) Added new methods to satisfy the blockchainer interface: IsLowPriority, Verify, GetMemPool
* Added new methods: RelayTxn, RelayDirectly
* Fixed tests
* Implemented RPC server method sendrawtransaction
* Refactor getrawtransaction, sendrawtransaction in separate methods
* Moved 'secondsPerBlock' to config file
* Implemented Kim suggestions:
1) Fixed data race issues
2) refactor Verify method
3) Get rid of unused InputIntersection method due to refactoring Verify method
4) Fixed bug in https://github.com/CityOfZion/neo-go/pull/174#discussion_r264108135
5) minor simplications of the code
* Fixed minor issues related to
1) space
2) getter methods do not need pointer on the receiver
3) error message
4) refactoring CompareTo method in uint256.go
* Fixed small issues
* Use sync.RWMutex instead of sync.Mutex
* Refined (R)Lock/(R)Unlock
* return error instead of bool in Verify methods
2019-03-20 12:30:05 +00:00
2020-01-29 08:56:40 +00:00
// broadcastTX broadcasts an inventory message about new transaction.
2020-11-27 10:55:48 +00:00
func ( s * Server ) broadcastTX ( t * transaction . Transaction , _ interface { } ) {
2020-02-18 15:11:55 +00:00
select {
case s . transactions <- t :
case <- s . quit :
}
}
func ( s * Server ) broadcastTxHashes ( hs [ ] util . Uint256 ) {
2020-05-21 10:35:44 +00:00
msg := NewMessage ( CMDInv , payload . NewInventory ( payload . TXType , hs ) )
2020-01-22 08:01:13 +00:00
// We need to filter out non-relaying nodes, so plain broadcast
// functions don't fit here.
2020-05-22 09:17:17 +00:00
s . iteratePeersWithSendMsg ( msg , Peer . EnqueuePacket , Peer . IsFullNode )
Implement rpc server method: sendrawtransaction (#174)
* Added new config attributes: 'SecondsPerBlock','LowPriorityThreshold'
* Added new files:
* Added new method: CompareTo
* Fixed empty Slice case
* Added new methods: LessThan, GreaterThan, Equal, CompareTo
* Added new method: InputIntersection
* Added MaxTransactionSize, GroupOutputByAssetID
* Added ned method: ScriptHash
* Added new method: IsDoubleSpend
* Refactor blockchainer, Added Feer interface, Verify and GetMemPool method
* 1) Added MemPool
2) Added new methods to satisfy the blockchainer interface: IsLowPriority, Verify, GetMemPool
* Added new methods: RelayTxn, RelayDirectly
* Fixed tests
* Implemented RPC server method sendrawtransaction
* Refactor getrawtransaction, sendrawtransaction in separate methods
* Moved 'secondsPerBlock' to config file
* Implemented Kim suggestions:
1) Fixed data race issues
2) refactor Verify method
3) Get rid of unused InputIntersection method due to refactoring Verify method
4) Fixed bug in https://github.com/CityOfZion/neo-go/pull/174#discussion_r264108135
5) minor simplications of the code
* Fixed minor issues related to
1) space
2) getter methods do not need pointer on the receiver
3) error message
4) refactoring CompareTo method in uint256.go
* Fixed small issues
* Use sync.RWMutex instead of sync.Mutex
* Refined (R)Lock/(R)Unlock
* return error instead of bool in Verify methods
2019-03-20 12:30:05 +00:00
}
2020-02-18 15:11:55 +00:00
2020-11-27 10:55:48 +00:00
// initStaleMemPools initializes mempools for stale tx/payload processing.
func ( s * Server ) initStaleMemPools ( ) {
2020-11-11 12:49:51 +00:00
cfg := s . chain . GetConfig ( )
threshold := 5
if cfg . ValidatorsCount * 2 > threshold {
threshold = cfg . ValidatorsCount * 2
}
2021-08-03 19:28:16 +00:00
s . mempool . SetResendThreshold ( uint32 ( threshold ) , s . broadcastTX )
2020-11-27 10:55:48 +00:00
if s . chain . P2PSigExtensionsEnabled ( ) {
s . notaryRequestPool . SetResendThreshold ( uint32 ( threshold ) , s . broadcastP2PNotaryRequestPayload )
}
2020-11-11 12:49:51 +00:00
}
2020-02-18 15:11:55 +00:00
// broadcastTxLoop is a loop for batching and sending
// transactions hashes in an INV payload.
func ( s * Server ) broadcastTxLoop ( ) {
const (
batchTime = time . Millisecond * 50
batchSize = 32
)
txs := make ( [ ] util . Uint256 , 0 , batchSize )
var timer * time . Timer
timerCh := func ( ) <- chan time . Time {
if timer == nil {
return nil
}
return timer . C
}
broadcast := func ( ) {
s . broadcastTxHashes ( txs )
txs = txs [ : 0 ]
if timer != nil {
timer . Stop ( )
}
}
for {
select {
case <- s . quit :
loop :
for {
select {
case <- s . transactions :
default :
break loop
}
}
return
case <- timerCh ( ) :
if len ( txs ) > 0 {
broadcast ( )
}
case tx := <- s . transactions :
if len ( txs ) == 0 {
timer = time . NewTimer ( batchTime )
}
txs = append ( txs , tx . Hash ( ) )
if len ( txs ) == batchSize {
broadcast ( )
}
}
}
}
2020-06-10 07:01:21 +00:00
2021-04-30 08:24:43 +00:00
// Port returns a server port that should be used in P2P version exchange. In
// case if `AnnouncedPort` is set in the server.Config, the announced node port
// will be returned (e.g. consider the node running behind NAT). If `AnnouncedPort`
// isn't set, the port returned may still differs from that of server.Config.
2020-06-10 07:01:21 +00:00
func ( s * Server ) Port ( ) ( uint16 , error ) {
2021-04-30 08:24:43 +00:00
if s . AnnouncedPort != 0 {
return s . ServerConfig . AnnouncedPort , nil
}
2020-06-10 07:01:21 +00:00
var port uint16
_ , portStr , err := net . SplitHostPort ( s . transport . Address ( ) )
if err != nil {
port = s . ServerConfig . Port
} else {
p , err := strconv . ParseUint ( portStr , 10 , 16 )
if err != nil {
return 0 , err
}
port = uint16 ( p )
}
return port , nil
}