2018-01-26 18:04:13 +00:00
package network
import (
network: rework broadcast logic
We have a number of queues for different purposes:
* regular broadcast queue
* direct p2p queue
* high-priority queue
And two basic egress scenarios:
* direct p2p messages (replies to requests in Server's handle* methods)
* broadcasted messages
Low priority broadcasted messages:
* transaction inventories
* block inventories
* notary inventories
* non-consensus extensibles
High-priority broadcasted messages:
* consensus extensibles
* getdata transaction requests from consensus process
* getaddr requests
P2P messages are a bit more complicated, most of the time they use p2p queue,
but extensible message requests/replies use HP queue.
Server's handle* code is run from Peer's handleIncoming, every peer has this
thread that handles incoming messages. When working with the peer it's
important to reply to requests and blocking this thread until we send (queue)
a reply is fine, if the peer is slow we just won't get anything new from
it. The queue used is irrelevant wrt this issue.
Broadcasted messages are radically different, we want them to be delivered to
many peers, but we don't care about specific ones. If it's delivered to 2/3 of
the peers we're fine, if it's delivered to more of them --- it's not an
issue. But doing this fairly is not an easy thing, current code tries performing
unblocked sends and if this doesn't yield enough results it then blocks (but
has a timeout, we can't wait indefinitely). But it does so in sequential
manner, once the peer is chosen the code will wait for it (and only it) until
timeout happens.
What can be done instead is an attempt to push the message to all of the peers
simultaneously (or close to that). If they all deliver --- OK, if some block
and wait then we can wait until _any_ of them pushes the message through (or
global timeout happens, we still can't wait forever). If we have enough
deliveries then we can cancel pending ones and it's again not an error if
these canceled threads still do their job.
This makes the system more dynamic and adds some substantial processing
overhead, but it's a networking code, any of this overhead is much lower than
the actual packet delivery time. It also allows to spread the load more
fairly, if there is any spare queue it'll get the packet and release the
broadcaster. On the next broadcast iteration another peer is more likely to be
chosen just because it didn't get a message previously (and had some time to
deliver already queued messages).
It works perfectly in tests, with optimal networking conditions we have much
better block times and TPS increases by 5-25%% depending on the scenario.
I'd go as far as to say that it fixes the original problem of #2678, because
in this particular scenario we have empty queues in ~100% of the cases and
this new logic will likely lead to 100% fan out in this case (cancelation just
won't happen fast enough). But when the load grows and there is some waiting
in the queue it will optimize out the slowest links.
2022-10-10 19:48:06 +00:00
"context"
2019-11-15 17:42:23 +00:00
"crypto/rand"
"encoding/binary"
2018-03-14 09:36:59 +00:00
"errors"
2018-01-26 18:04:13 +00:00
"fmt"
2022-01-14 01:09:54 +00:00
"math/big"
2020-11-26 15:53:10 +00:00
mrand "math/rand"
2019-09-13 17:38:34 +00:00
"net"
2021-07-30 13:57:42 +00:00
"sort"
2019-10-29 17:51:17 +00:00
"strconv"
2018-03-14 09:36:59 +00:00
"sync"
2018-01-28 10:12:05 +00:00
"time"
2018-01-27 15:00:28 +00:00
2022-01-14 01:09:54 +00:00
"github.com/nspcc-dev/neo-go/pkg/config"
2020-03-03 14:21:42 +00:00
"github.com/nspcc-dev/neo-go/pkg/core/block"
2020-11-27 10:55:48 +00:00
"github.com/nspcc-dev/neo-go/pkg/core/mempool"
2021-05-28 11:55:06 +00:00
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
2021-07-30 13:57:42 +00:00
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
2020-03-03 14:21:42 +00:00
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
2021-07-30 13:57:42 +00:00
"github.com/nspcc-dev/neo-go/pkg/io"
2020-05-22 09:17:17 +00:00
"github.com/nspcc-dev/neo-go/pkg/network/capability"
2021-01-18 12:52:51 +00:00
"github.com/nspcc-dev/neo-go/pkg/network/extpool"
2020-03-03 14:21:42 +00:00
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/util"
2019-11-15 10:32:40 +00:00
"go.uber.org/atomic"
2019-12-30 07:43:05 +00:00
"go.uber.org/zap"
2018-01-26 18:04:13 +00:00
)
const (
2019-10-22 14:56:03 +00:00
// peer numbers are arbitrary at the moment.
2021-05-04 14:54:16 +00:00
defaultMinPeers = 5
defaultAttemptConnPeers = 20
defaultMaxPeers = 100
defaultExtensiblePoolSize = 20
maxBlockBatch = 200
minPoolCount = 30
2018-01-26 18:04:13 +00:00
)
2018-03-14 09:36:59 +00:00
var (
2019-11-06 09:39:17 +00:00
errAlreadyConnected = errors . New ( "already connected" )
2018-03-14 09:36:59 +00:00
errIdenticalID = errors . New ( "identical node id" )
errInvalidNetwork = errors . New ( "invalid network" )
2019-11-06 12:17:20 +00:00
errMaxPeers = errors . New ( "max peers reached" )
2018-03-14 09:36:59 +00:00
errServerShutdown = errors . New ( "server shutdown" )
errInvalidInvType = errors . New ( "invalid inventory type" )
)
2018-02-01 20:28:45 +00:00
2018-03-14 09:36:59 +00:00
type (
2022-01-14 01:09:54 +00:00
// Ledger is everything Server needs from the blockchain.
Ledger interface {
extpool . Ledger
mempool . Feer
Blockqueuer
GetBlock ( hash util . Uint256 ) ( * block . Block , error )
GetConfig ( ) config . ProtocolConfiguration
GetHeader ( hash util . Uint256 ) ( * block . Header , error )
GetHeaderHash ( int ) util . Uint256
GetMaxVerificationGAS ( ) int64
GetMemPool ( ) * mempool . Pool
GetNotaryBalance ( acc util . Uint160 ) * big . Int
GetNotaryContractScriptHash ( ) util . Uint160
GetNotaryDepositExpiration ( acc util . Uint160 ) uint32
GetTransaction ( util . Uint256 ) ( * transaction . Transaction , uint32 , error )
HasBlock ( util . Uint256 ) bool
HeaderHeight ( ) uint32
P2PSigExtensionsEnabled ( ) bool
PoolTx ( t * transaction . Transaction , pools ... * mempool . Pool ) error
PoolTxWithData ( t * transaction . Transaction , data interface { } , mp * mempool . Pool , feer mempool . Feer , verificationFunction func ( t * transaction . Transaction , data interface { } ) error ) error
RegisterPostBlock ( f func ( func ( * transaction . Transaction , * mempool . Pool , bool ) bool , * mempool . Pool , * block . Block ) )
2022-08-19 17:47:55 +00:00
SubscribeForBlocks ( ch chan * block . Block )
UnsubscribeFromBlocks ( ch chan * block . Block )
2022-01-14 01:09:54 +00:00
}
2022-01-12 01:11:21 +00:00
// Service is a service abstraction (oracle, state root, consensus, etc).
Service interface {
2022-04-22 08:33:56 +00:00
Name ( ) string
2022-01-12 01:11:21 +00:00
Start ( )
Shutdown ( )
}
2018-03-14 09:36:59 +00:00
// Server represents the local Node in the network. Its transport could
// be of any kind.
Server struct {
2018-03-15 20:45:37 +00:00
// ServerConfig holds the Server configuration.
ServerConfig
2018-02-06 06:43:32 +00:00
2018-03-23 20:36:59 +00:00
// id also known as the nonce of the server.
2018-03-14 09:36:59 +00:00
id uint32
2018-01-26 18:04:13 +00:00
2022-01-20 18:14:42 +00:00
// A copy of the Ledger's config.
config config . ProtocolConfiguration
2020-06-18 09:00:51 +00:00
2020-11-27 10:55:48 +00:00
transport Transporter
discovery Discoverer
2022-01-14 01:09:54 +00:00
chain Ledger
2020-11-27 10:55:48 +00:00
bQueue * blockQueue
2021-07-30 13:57:42 +00:00
bSyncQueue * blockQueue
2021-08-03 19:28:16 +00:00
mempool * mempool . Pool
2020-11-27 10:55:48 +00:00
notaryRequestPool * mempool . Pool
2021-01-18 12:52:51 +00:00
extensiblePool * extpool . Pool
2020-12-30 08:01:13 +00:00
notaryFeer NotaryFeer
2022-07-27 08:25:58 +00:00
serviceLock sync . RWMutex
services map [ string ] Service
extensHandlers map [ string ] func ( * payload . Extensible ) error
txCallback func ( * transaction . Transaction )
2018-01-26 18:04:13 +00:00
2021-08-03 19:43:31 +00:00
txInLock sync . Mutex
txInMap map [ util . Uint256 ] struct { }
2018-03-14 09:36:59 +00:00
lock sync . RWMutex
peers map [ Peer ] bool
2018-01-26 20:39:34 +00:00
2021-09-27 13:13:37 +00:00
// lastRequestedBlock contains a height of the last requested block.
lastRequestedBlock atomic . Uint32
// lastRequestedHeader contains a height of the last requested header.
lastRequestedHeader atomic . Uint32
2020-11-26 15:53:10 +00:00
2018-03-14 09:36:59 +00:00
register chan Peer
unregister chan peerDrop
quit chan struct { }
2022-08-19 17:43:15 +00:00
relayFin chan struct { }
2019-11-15 10:32:40 +00:00
2020-02-18 15:11:55 +00:00
transactions chan * transaction . Transaction
2021-04-02 09:55:56 +00:00
syncReached * atomic . Bool
2019-12-30 07:43:05 +00:00
2022-01-12 21:20:03 +00:00
stateSync StateSync
2020-09-28 11:58:04 +00:00
2019-12-30 07:43:05 +00:00
log * zap . Logger
2018-03-14 09:36:59 +00:00
}
2018-03-10 12:04:06 +00:00
2018-03-14 09:36:59 +00:00
peerDrop struct {
peer Peer
reason error
2018-02-01 08:00:42 +00:00
}
2018-03-14 09:36:59 +00:00
)
2019-11-15 17:42:23 +00:00
func randomID ( ) uint32 {
buf := make ( [ ] byte , 4 )
_ , _ = rand . Read ( buf )
return binary . BigEndian . Uint32 ( buf )
}
2018-03-14 09:36:59 +00:00
// NewServer returns a new Server, initialized with the given configuration.
2022-01-14 01:09:54 +00:00
func NewServer ( config ServerConfig , chain Ledger , stSync StateSync , log * zap . Logger ) ( * Server , error ) {
2022-01-12 21:20:03 +00:00
return newServerFromConstructors ( config , chain , stSync , log , func ( s * Server ) Transporter {
2020-12-07 09:52:19 +00:00
return NewTCPTransport ( s , net . JoinHostPort ( s . ServerConfig . Address , strconv . Itoa ( int ( s . ServerConfig . Port ) ) ) , s . log )
2022-01-12 20:04:07 +00:00
} , newDefaultDiscovery )
2020-12-07 09:52:19 +00:00
}
2022-01-14 01:09:54 +00:00
func newServerFromConstructors ( config ServerConfig , chain Ledger , stSync StateSync , log * zap . Logger ,
2020-12-07 09:52:19 +00:00
newTransport func ( * Server ) Transporter ,
newDiscovery func ( [ ] string , time . Duration , Transporter ) Discoverer ,
) ( * Server , error ) {
2019-12-30 07:43:05 +00:00
if log == nil {
2020-01-22 08:17:51 +00:00
return nil , errors . New ( "logger is a required parameter" )
2019-12-30 07:43:05 +00:00
}
2021-05-04 14:54:16 +00:00
if config . ExtensiblePoolSize <= 0 {
config . ExtensiblePoolSize = defaultExtensiblePoolSize
log . Info ( "ExtensiblePoolSize is not set or wrong, using default value" ,
zap . Int ( "ExtensiblePoolSize" , config . ExtensiblePoolSize ) )
}
2018-03-09 15:55:25 +00:00
s := & Server {
2022-01-20 18:14:42 +00:00
ServerConfig : config ,
chain : chain ,
id : randomID ( ) ,
config : chain . GetConfig ( ) ,
quit : make ( chan struct { } ) ,
2022-08-19 17:43:15 +00:00
relayFin : make ( chan struct { } ) ,
2022-01-20 18:14:42 +00:00
register : make ( chan Peer ) ,
unregister : make ( chan peerDrop ) ,
txInMap : make ( map [ util . Uint256 ] struct { } ) ,
peers : make ( map [ Peer ] bool ) ,
syncReached : atomic . NewBool ( false ) ,
mempool : chain . GetMemPool ( ) ,
extensiblePool : extpool . New ( chain , config . ExtensiblePoolSize ) ,
log : log ,
transactions : make ( chan * transaction . Transaction , 64 ) ,
2022-04-22 08:33:56 +00:00
services : make ( map [ string ] Service ) ,
2022-01-20 18:14:42 +00:00
extensHandlers : make ( map [ string ] func ( * payload . Extensible ) error ) ,
stateSync : stSync ,
2018-01-26 18:04:13 +00:00
}
2020-11-27 10:55:48 +00:00
if chain . P2PSigExtensionsEnabled ( ) {
2020-12-30 08:01:13 +00:00
s . notaryFeer = NewNotaryFeer ( chain )
2022-01-20 18:14:42 +00:00
s . notaryRequestPool = mempool . New ( s . config . P2PNotaryRequestPayloadPoolSize , 1 , true )
2022-01-14 01:09:54 +00:00
chain . RegisterPostBlock ( func ( isRelevant func ( * transaction . Transaction , * mempool . Pool , bool ) bool , txpool * mempool . Pool , _ * block . Block ) {
2020-11-27 10:55:48 +00:00
s . notaryRequestPool . RemoveStale ( func ( t * transaction . Transaction ) bool {
2022-01-14 01:09:54 +00:00
return isRelevant ( t , txpool , true )
2020-12-30 08:01:13 +00:00
} , s . notaryFeer )
2020-11-27 10:55:48 +00:00
} )
}
2020-02-14 17:46:05 +00:00
s . bQueue = newBlockQueue ( maxBlockBatch , chain , log , func ( b * block . Block ) {
2021-06-04 17:29:47 +00:00
s . tryStartServices ( )
2020-02-14 17:46:05 +00:00
} )
2018-01-26 18:04:13 +00:00
2022-01-12 21:20:03 +00:00
s . bSyncQueue = newBlockQueue ( maxBlockBatch , s . stateSync , log , nil )
2021-07-30 13:57:42 +00:00
2020-01-13 12:22:21 +00:00
if s . MinPeers < 0 {
2019-12-30 07:43:05 +00:00
s . log . Info ( "bad MinPeers configured, using the default value" ,
zap . Int ( "configured" , s . MinPeers ) ,
zap . Int ( "actual" , defaultMinPeers ) )
2019-11-01 10:29:54 +00:00
s . MinPeers = defaultMinPeers
}
2019-11-06 12:17:20 +00:00
if s . MaxPeers <= 0 {
2019-12-30 07:43:05 +00:00
s . log . Info ( "bad MaxPeers configured, using the default value" ,
zap . Int ( "configured" , s . MaxPeers ) ,
zap . Int ( "actual" , defaultMaxPeers ) )
2019-11-06 12:17:20 +00:00
s . MaxPeers = defaultMaxPeers
}
if s . AttemptConnPeers <= 0 {
2019-12-30 07:43:05 +00:00
s . log . Info ( "bad AttemptConnPeers configured, using the default value" ,
zap . Int ( "configured" , s . AttemptConnPeers ) ,
zap . Int ( "actual" , defaultAttemptConnPeers ) )
2019-11-06 12:17:20 +00:00
s . AttemptConnPeers = defaultAttemptConnPeers
}
2020-12-07 09:52:19 +00:00
s . transport = newTransport ( s )
s . discovery = newDiscovery (
2020-10-13 13:30:10 +00:00
s . Seeds ,
2018-03-14 09:36:59 +00:00
s . DialTimeout ,
s . transport ,
)
2018-01-26 18:04:13 +00:00
2020-01-22 08:17:51 +00:00
return s , nil
2018-01-30 10:56:36 +00:00
}
2018-03-23 20:36:59 +00:00
// ID returns the servers ID.
func ( s * Server ) ID ( ) uint32 {
return s . id
}
2022-07-04 20:03:50 +00:00
// Start will start the server and its underlying transport. Calling it twice
// is an error.
2018-03-23 20:36:59 +00:00
func ( s * Server ) Start ( errChan chan error ) {
2019-12-30 07:43:05 +00:00
s . log . Info ( "node started" ,
zap . Uint32 ( "blockHeight" , s . chain . BlockHeight ( ) ) ,
zap . Uint32 ( "headerHeight" , s . chain . HeaderHeight ( ) ) )
2018-03-17 11:53:21 +00:00
2021-04-02 09:55:56 +00:00
s . tryStartServices ( )
2020-11-27 10:55:48 +00:00
s . initStaleMemPools ( )
2020-01-13 12:22:21 +00:00
2020-02-18 15:11:55 +00:00
go s . broadcastTxLoop ( )
2020-05-07 20:00:38 +00:00
go s . relayBlocksLoop ( )
2019-09-25 16:54:31 +00:00
go s . bQueue . run ( )
2021-07-30 13:57:42 +00:00
go s . bSyncQueue . run ( )
2018-03-14 09:36:59 +00:00
go s . transport . Accept ( )
2019-10-29 17:51:17 +00:00
setServerAndNodeVersions ( s . UserAgent , strconv . FormatUint ( uint64 ( s . id ) , 10 ) )
2018-03-14 09:36:59 +00:00
s . run ( )
2018-01-31 19:11:08 +00:00
}
2018-01-30 10:56:36 +00:00
2022-07-04 20:03:50 +00:00
// Shutdown disconnects all peers and stops listening. Calling it twice is an error,
// once stopped the same intance of the Server can't be started again by calling Start.
2018-03-23 20:36:59 +00:00
func ( s * Server ) Shutdown ( ) {
2019-12-30 07:43:05 +00:00
s . log . Info ( "shutting down server" , zap . Int ( "peers" , s . PeerCount ( ) ) )
2020-02-24 12:54:18 +00:00
s . transport . Close ( )
2020-02-24 12:39:31 +00:00
s . discovery . Close ( )
2021-08-06 12:04:13 +00:00
for _ , p := range s . getPeers ( nil ) {
2020-02-24 12:54:18 +00:00
p . Disconnect ( errServerShutdown )
}
2019-09-25 16:54:31 +00:00
s . bQueue . discard ( )
2021-07-30 13:57:42 +00:00
s . bSyncQueue . discard ( )
2022-07-27 08:25:58 +00:00
s . serviceLock . RLock ( )
2022-01-12 01:11:21 +00:00
for _ , svc := range s . services {
svc . Shutdown ( )
2021-05-28 11:55:06 +00:00
}
2022-07-27 08:25:58 +00:00
s . serviceLock . RUnlock ( )
2021-05-28 11:55:06 +00:00
if s . chain . P2PSigExtensionsEnabled ( ) {
2021-01-15 12:40:15 +00:00
s . notaryRequestPool . StopSubscriptions ( )
}
2018-03-23 20:36:59 +00:00
close ( s . quit )
2022-08-19 17:43:15 +00:00
<- s . relayFin
2018-03-23 20:36:59 +00:00
}
2022-01-12 02:01:34 +00:00
// AddService allows to add a service to be started/stopped by Server.
func ( s * Server ) AddService ( svc Service ) {
2022-07-27 08:25:58 +00:00
s . serviceLock . Lock ( )
defer s . serviceLock . Unlock ( )
s . addService ( svc )
}
// addService is an unlocked version of AddService.
func ( s * Server ) addService ( svc Service ) {
2022-04-22 08:33:56 +00:00
s . services [ svc . Name ( ) ] = svc
2020-09-28 11:58:04 +00:00
}
2022-04-20 18:30:09 +00:00
// AddExtensibleService register a service that handles an extensible payload of some kind.
2022-01-12 18:09:37 +00:00
func ( s * Server ) AddExtensibleService ( svc Service , category string , handler func ( * payload . Extensible ) error ) {
2022-07-27 08:25:58 +00:00
s . serviceLock . Lock ( )
defer s . serviceLock . Unlock ( )
s . addExtensibleService ( svc , category , handler )
}
// addExtensibleService is an unlocked version of AddExtensibleService.
func ( s * Server ) addExtensibleService ( svc Service , category string , handler func ( * payload . Extensible ) error ) {
2022-01-12 18:09:37 +00:00
s . extensHandlers [ category ] = handler
2022-07-27 08:25:58 +00:00
s . addService ( svc )
2021-02-01 16:00:07 +00:00
}
2022-07-28 15:30:14 +00:00
// AddConsensusService registers consensus service that handles transactions and dBFT extensible payloads.
func ( s * Server ) AddConsensusService ( svc Service , handler func ( * payload . Extensible ) error , txCallback func ( * transaction . Transaction ) ) {
2022-07-27 08:25:58 +00:00
s . serviceLock . Lock ( )
defer s . serviceLock . Unlock ( )
2022-01-12 20:04:07 +00:00
s . txCallback = txCallback
2022-07-28 15:30:14 +00:00
s . addExtensibleService ( svc , payload . ConsensusCategory , handler )
2022-07-27 08:25:58 +00:00
}
// DelService drops a service from the list, use it when the service is stopped
// outside of the Server.
func ( s * Server ) DelService ( svc Service ) {
s . serviceLock . Lock ( )
defer s . serviceLock . Unlock ( )
s . delService ( svc )
}
// delService is an unlocked version of DelService.
func ( s * Server ) delService ( svc Service ) {
delete ( s . services , svc . Name ( ) )
}
// DelExtensibleService drops a service that handler extensible payloads from the
// list, use it when the service is stopped outside of the Server.
func ( s * Server ) DelExtensibleService ( svc Service , category string ) {
s . serviceLock . Lock ( )
defer s . serviceLock . Unlock ( )
s . delExtensibleService ( svc , category )
}
// delExtensibleService is an unlocked version of DelExtensibleService.
func ( s * Server ) delExtensibleService ( svc Service , category string ) {
delete ( s . extensHandlers , category )
s . delService ( svc )
}
2022-07-28 15:30:14 +00:00
// DelConsensusService unregisters consensus service that handles transactions and dBFT extensible payloads.
func ( s * Server ) DelConsensusService ( svc Service ) {
2022-07-27 08:25:58 +00:00
s . serviceLock . Lock ( )
defer s . serviceLock . Unlock ( )
s . txCallback = nil
2022-07-28 15:30:14 +00:00
s . delExtensibleService ( svc , payload . ConsensusCategory )
2022-01-12 20:04:07 +00:00
}
2022-01-12 20:21:09 +00:00
// GetNotaryPool allows to retrieve notary pool, if it's configured.
func ( s * Server ) GetNotaryPool ( ) * mempool . Pool {
return s . notaryRequestPool
}
2018-04-09 16:58:09 +00:00
// UnconnectedPeers returns a list of peers that are in the discovery peer list
// but are not connected to the server.
2018-03-23 20:36:59 +00:00
func ( s * Server ) UnconnectedPeers ( ) [ ] string {
2020-01-10 12:16:14 +00:00
return s . discovery . UnconnectedPeers ( )
2018-03-23 20:36:59 +00:00
}
2022-04-20 18:30:09 +00:00
// BadPeers returns a list of peers that are flagged as "bad" peers.
2018-03-23 20:36:59 +00:00
func ( s * Server ) BadPeers ( ) [ ] string {
2020-01-10 12:16:14 +00:00
return s . discovery . BadPeers ( )
2018-03-23 20:36:59 +00:00
}
2020-01-10 12:13:29 +00:00
// ConnectedPeers returns a list of currently connected peers.
func ( s * Server ) ConnectedPeers ( ) [ ] string {
s . lock . RLock ( )
defer s . lock . RUnlock ( )
peers := make ( [ ] string , 0 , len ( s . peers ) )
for k := range s . peers {
peers = append ( peers , k . PeerAddr ( ) . String ( ) )
}
return peers
}
2020-01-27 09:44:05 +00:00
// run is a goroutine that starts another goroutine to manage protocol specifics
// while itself dealing with peers management (handling connects/disconnects).
2018-03-14 09:36:59 +00:00
func ( s * Server ) run ( ) {
2020-01-27 09:44:05 +00:00
go s . runProto ( )
2018-03-09 15:55:25 +00:00
for {
2019-11-06 12:17:20 +00:00
if s . PeerCount ( ) < s . MinPeers {
s . discovery . RequestRemote ( s . AttemptConnPeers )
2019-09-12 13:19:18 +00:00
}
2019-09-13 09:03:07 +00:00
if s . discovery . PoolCount ( ) < minPoolCount {
2020-05-21 10:35:44 +00:00
s . broadcastHPMessage ( NewMessage ( CMDGetAddr , payload . NewNullPayload ( ) ) )
2019-09-13 09:03:07 +00:00
}
2018-03-14 09:36:59 +00:00
select {
case <- s . quit :
return
case p := <- s . register :
2019-11-06 09:38:47 +00:00
s . lock . Lock ( )
2018-03-14 09:36:59 +00:00
s . peers [ p ] = true
2019-11-06 09:38:47 +00:00
s . lock . Unlock ( )
2019-11-06 12:17:20 +00:00
peerCount := s . PeerCount ( )
2020-01-28 16:39:12 +00:00
s . log . Info ( "new peer connected" , zap . Stringer ( "addr" , p . RemoteAddr ( ) ) , zap . Int ( "peerCount" , peerCount ) )
2019-11-06 12:17:20 +00:00
if peerCount > s . MaxPeers {
s . lock . RLock ( )
// Pick a random peer and drop connection to it.
for peer := range s . peers {
2020-02-24 09:39:46 +00:00
// It will send us unregister signal.
go peer . Disconnect ( errMaxPeers )
2019-11-06 12:17:20 +00:00
break
}
s . lock . RUnlock ( )
}
2019-10-29 17:51:17 +00:00
updatePeersConnectedMetric ( s . PeerCount ( ) )
2018-03-14 09:36:59 +00:00
case drop := <- s . unregister :
2019-11-06 09:38:47 +00:00
s . lock . Lock ( )
2019-09-13 12:36:53 +00:00
if s . peers [ drop . peer ] {
delete ( s . peers , drop . peer )
2019-11-06 09:38:47 +00:00
s . lock . Unlock ( )
2019-12-30 07:43:05 +00:00
s . log . Warn ( "peer disconnected" ,
zap . Stringer ( "addr" , drop . peer . RemoteAddr ( ) ) ,
2021-11-01 09:19:00 +00:00
zap . Error ( drop . reason ) ,
2019-12-30 07:43:05 +00:00
zap . Int ( "peerCount" , s . PeerCount ( ) ) )
2019-11-06 07:55:21 +00:00
addr := drop . peer . PeerAddr ( ) . String ( )
2022-09-02 11:29:47 +00:00
if errors . Is ( drop . reason , errIdenticalID ) {
2019-11-27 08:56:56 +00:00
s . discovery . RegisterBadAddr ( addr )
2022-09-02 11:29:47 +00:00
} else if errors . Is ( drop . reason , errAlreadyConnected ) {
2020-12-23 13:13:57 +00:00
// There is a race condition when peer can be disconnected twice for the this reason
// which can lead to no connections to peer at all. Here we check for such a possibility.
stillConnected := false
s . lock . RLock ( )
verDrop := drop . peer . Version ( )
addr := drop . peer . PeerAddr ( ) . String ( )
if verDrop != nil {
for peer := range s . peers {
ver := peer . Version ( )
// Already connected, drop this connection.
if ver != nil && ver . Nonce == verDrop . Nonce && peer . PeerAddr ( ) . String ( ) == addr {
stillConnected = true
}
}
}
s . lock . RUnlock ( )
if ! stillConnected {
s . discovery . UnregisterConnectedAddr ( addr )
s . discovery . BackFill ( addr )
}
} else {
2019-11-27 08:56:56 +00:00
s . discovery . UnregisterConnectedAddr ( addr )
s . discovery . BackFill ( addr )
}
2019-10-29 17:51:17 +00:00
updatePeersConnectedMetric ( s . PeerCount ( ) )
2019-11-06 09:38:47 +00:00
} else {
// else the peer is already gone, which can happen
// because we have two goroutines sending signals here
s . lock . Unlock ( )
2019-09-13 12:36:53 +00:00
}
2018-03-09 15:55:25 +00:00
}
2018-01-31 19:11:08 +00:00
}
2018-01-27 12:39:07 +00:00
}
2020-01-27 09:44:05 +00:00
// runProto is a goroutine that manages server-wide protocol events.
func ( s * Server ) runProto ( ) {
pingTimer := time . NewTimer ( s . PingInterval )
for {
prevHeight := s . chain . BlockHeight ( )
select {
case <- s . quit :
return
case <- pingTimer . C :
if s . chain . BlockHeight ( ) == prevHeight {
// Get a copy of s.peers to avoid holding a lock while sending.
2021-08-06 12:04:13 +00:00
for _ , peer := range s . getPeers ( nil ) {
2021-08-06 08:26:19 +00:00
_ = peer . SendPing ( NewMessage ( CMDPing , payload . NewPing ( s . chain . BlockHeight ( ) , s . id ) ) )
2020-01-27 09:44:05 +00:00
}
}
pingTimer . Reset ( s . PingInterval )
}
}
}
2021-04-02 09:55:56 +00:00
func ( s * Server ) tryStartServices ( ) {
if s . syncReached . Load ( ) {
2019-11-15 10:32:40 +00:00
return
}
2021-04-02 09:55:56 +00:00
if s . IsInSync ( ) && s . syncReached . CAS ( false , true ) {
s . log . Info ( "node reached synchronized state, starting services" )
2021-05-28 11:55:06 +00:00
if s . chain . P2PSigExtensionsEnabled ( ) {
s . notaryRequestPool . RunSubscriptions ( ) // WSClient is also a subscriber.
}
2022-07-27 08:25:58 +00:00
s . serviceLock . RLock ( )
2022-01-12 01:11:21 +00:00
for _ , svc := range s . services {
svc . Start ( )
2021-04-02 10:12:06 +00:00
}
2022-07-27 08:25:58 +00:00
s . serviceLock . RUnlock ( )
2019-11-15 10:32:40 +00:00
}
}
2022-04-20 18:30:09 +00:00
// SubscribeForNotaryRequests adds the given channel to a notary request event
2021-05-28 11:55:06 +00:00
// broadcasting, so when a new P2PNotaryRequest is received or an existing
2022-04-20 18:30:09 +00:00
// P2PNotaryRequest is removed from the pool you'll receive it via this channel.
2021-05-28 11:55:06 +00:00
// Make sure it's read from regularly as not reading these events might affect
// other Server functions.
// Ensure that P2PSigExtensions are enabled before calling this method.
func ( s * Server ) SubscribeForNotaryRequests ( ch chan <- mempoolevent . Event ) {
if ! s . chain . P2PSigExtensionsEnabled ( ) {
panic ( "P2PSigExtensions are disabled" )
}
s . notaryRequestPool . SubscribeForTransactions ( ch )
}
2022-04-20 18:30:09 +00:00
// UnsubscribeFromNotaryRequests unsubscribes the given channel from notary request
2021-05-28 11:55:06 +00:00
// notifications, you can close it afterwards. Passing non-subscribed channel
// is a no-op.
// Ensure that P2PSigExtensions are enabled before calling this method.
func ( s * Server ) UnsubscribeFromNotaryRequests ( ch chan <- mempoolevent . Event ) {
if ! s . chain . P2PSigExtensionsEnabled ( ) {
panic ( "P2PSigExtensions are disabled" )
}
s . notaryRequestPool . UnsubscribeFromTransactions ( ch )
}
2022-04-20 18:30:09 +00:00
// getPeers returns the current list of the peers connected to the server filtered by
2021-08-06 12:04:13 +00:00
// isOK function if it's given.
func ( s * Server ) getPeers ( isOK func ( Peer ) bool ) [ ] Peer {
2019-11-15 10:32:40 +00:00
s . lock . RLock ( )
defer s . lock . RUnlock ( )
2021-08-05 20:59:53 +00:00
peers := make ( [ ] Peer , 0 , len ( s . peers ) )
for k := range s . peers {
2021-08-06 12:04:13 +00:00
if isOK != nil && ! isOK ( k ) {
continue
}
2021-08-05 20:59:53 +00:00
peers = append ( peers , k )
2019-11-15 10:32:40 +00:00
}
return peers
2018-03-23 20:36:59 +00:00
}
2022-04-20 18:30:09 +00:00
// PeerCount returns the number of the currently connected peers.
2018-03-14 09:36:59 +00:00
func ( s * Server ) PeerCount ( ) int {
s . lock . RLock ( )
defer s . lock . RUnlock ( )
return len ( s . peers )
2018-02-01 20:28:45 +00:00
}
2022-04-20 18:30:09 +00:00
// HandshakedPeersCount returns the number of the connected peers
2019-12-02 07:51:45 +00:00
// which have already performed handshake.
func ( s * Server ) HandshakedPeersCount ( ) int {
s . lock . RLock ( )
defer s . lock . RUnlock ( )
var count int
for p := range s . peers {
if p . Handshaked ( ) {
count ++
}
}
return count
}
2018-02-01 20:28:45 +00:00
2022-04-20 18:30:09 +00:00
// getVersionMsg returns the current version message.
2020-05-22 09:17:17 +00:00
func ( s * Server ) getVersionMsg ( ) ( * Message , error ) {
2020-06-10 07:01:21 +00:00
port , err := s . Port ( )
2020-05-22 09:17:17 +00:00
if err != nil {
2020-06-10 07:01:21 +00:00
return nil , err
2020-05-22 09:17:17 +00:00
}
capabilities := [ ] capability . Capability {
{
Type : capability . TCPServer ,
Data : & capability . Server {
Port : port ,
} ,
} ,
}
if s . Relay {
capabilities = append ( capabilities , capability . Capability {
Type : capability . FullNode ,
Data : & capability . Node {
StartHeight : s . chain . BlockHeight ( ) ,
} ,
} )
}
2018-03-14 09:36:59 +00:00
payload := payload . NewVersion (
2020-05-21 10:35:44 +00:00
s . Net ,
2018-03-14 09:36:59 +00:00
s . id ,
s . UserAgent ,
2020-05-22 09:17:17 +00:00
capabilities ,
2018-03-09 15:55:25 +00:00
)
2020-05-22 09:17:17 +00:00
return NewMessage ( CMDVersion , payload ) , nil
2018-03-14 09:36:59 +00:00
}
2018-03-09 15:55:25 +00:00
2020-02-14 17:46:05 +00:00
// IsInSync answers the question of whether the server is in sync with the
// network or not (at least how the server itself sees it). The server operates
// with the data that it has, the number of peers (that has to be more than
2022-04-20 18:30:09 +00:00
// minimum number) and the height of these peers (our chain has to be not lower
// than 2/3 of our peers have). Ideally, we would check for the highest of the
2020-02-14 17:46:05 +00:00
// peers, but the problem is that they can lie to us and send whatever height
2022-07-05 09:20:31 +00:00
// they want to. Once sync reached, IsInSync will always return `true`, even if
// server is temporary out of sync after that.
2020-02-14 17:46:05 +00:00
func ( s * Server ) IsInSync ( ) bool {
2022-07-05 09:20:31 +00:00
if s . syncReached . Load ( ) {
return true
}
2020-02-14 17:46:05 +00:00
var peersNumber int
var notHigher int
2021-07-30 13:57:42 +00:00
if s . stateSync . IsActive ( ) {
return false
}
2020-02-14 17:46:05 +00:00
if s . MinPeers == 0 {
return true
}
ourLastBlock := s . chain . BlockHeight ( )
s . lock . RLock ( )
for p := range s . peers {
if p . Handshaked ( ) {
peersNumber ++
if ourLastBlock >= p . LastBlockIndex ( ) {
notHigher ++
}
}
}
s . lock . RUnlock ( )
// Checking bQueue would also be nice, but it can be filled with garbage
// easily at the moment.
return peersNumber >= s . MinPeers && ( 3 * notHigher > 2 * peersNumber ) // && s.bQueue.length() == 0
}
2022-04-20 18:30:09 +00:00
// When a peer sends out its version, we reply with verack after validating
2018-03-14 09:36:59 +00:00
// the version.
func ( s * Server ) handleVersionCmd ( p Peer , version * payload . Version ) error {
2019-09-13 12:43:22 +00:00
err := p . HandleVersion ( version )
if err != nil {
return err
2018-03-14 09:36:59 +00:00
}
if s . id == version . Nonce {
return errIdenticalID
2018-01-28 13:59:32 +00:00
}
2022-04-20 18:30:09 +00:00
// Make sure both the server and the peer are operating on
2020-05-21 10:35:44 +00:00
// the same network.
if s . Net != version . Magic {
return errInvalidNetwork
}
2019-11-06 09:39:17 +00:00
peerAddr := p . PeerAddr ( ) . String ( )
2020-01-28 16:10:13 +00:00
s . discovery . RegisterConnectedAddr ( peerAddr )
2019-11-06 09:39:17 +00:00
s . lock . RLock ( )
for peer := range s . peers {
2020-01-28 10:54:09 +00:00
if p == peer {
continue
}
ver := peer . Version ( )
2019-11-06 09:39:17 +00:00
// Already connected, drop this connection.
2020-01-28 10:54:09 +00:00
if ver != nil && ver . Nonce == version . Nonce && peer . PeerAddr ( ) . String ( ) == peerAddr {
2019-11-06 09:39:17 +00:00
s . lock . RUnlock ( )
return errAlreadyConnected
}
}
s . lock . RUnlock ( )
2020-12-23 12:32:16 +00:00
return p . SendVersionAck ( NewMessage ( CMDVerack , payload . NewNullPayload ( ) ) )
2018-01-28 13:59:32 +00:00
}
2022-04-20 18:30:09 +00:00
// handleBlockCmd processes the block received from its peer.
2020-01-14 12:32:07 +00:00
func ( s * Server ) handleBlockCmd ( p Peer , block * block . Block ) error {
2021-07-30 13:57:42 +00:00
if s . stateSync . IsActive ( ) {
return s . bSyncQueue . putBlock ( block )
}
2019-09-25 16:54:31 +00:00
return s . bQueue . putBlock ( block )
2018-03-14 09:36:59 +00:00
}
2018-02-01 08:00:42 +00:00
2022-04-20 18:30:09 +00:00
// handlePing processes a ping request.
2020-01-17 10:17:19 +00:00
func ( s * Server ) handlePing ( p Peer , ping * payload . Ping ) error {
2020-08-14 13:22:15 +00:00
err := p . HandlePing ( ping )
if err != nil {
return err
}
2021-07-30 13:57:42 +00:00
err = s . requestBlocksOrHeaders ( p )
if err != nil {
return err
2020-08-14 13:22:15 +00:00
}
2020-05-21 10:35:44 +00:00
return p . EnqueueP2PMessage ( NewMessage ( CMDPong , payload . NewPing ( s . chain . BlockHeight ( ) , s . id ) ) )
2020-01-17 10:17:19 +00:00
}
2021-07-30 13:57:42 +00:00
func ( s * Server ) requestBlocksOrHeaders ( p Peer ) error {
if s . stateSync . NeedHeaders ( ) {
if s . chain . HeaderHeight ( ) < p . LastBlockIndex ( ) {
return s . requestHeaders ( p )
}
return nil
}
2021-08-13 09:46:23 +00:00
var (
2022-01-14 01:09:54 +00:00
bq Blockqueuer = s . chain
2021-08-13 09:46:23 +00:00
requestMPTNodes bool
)
2021-07-30 13:57:42 +00:00
if s . stateSync . IsActive ( ) {
bq = s . stateSync
2021-08-13 09:46:23 +00:00
requestMPTNodes = s . stateSync . NeedMPTNodes ( )
}
if bq . BlockHeight ( ) >= p . LastBlockIndex ( ) {
return nil
}
err := s . requestBlocks ( bq , p )
if err != nil {
return err
2021-07-30 13:57:42 +00:00
}
2021-08-13 09:46:23 +00:00
if requestMPTNodes {
return s . requestMPTNodes ( p , s . stateSync . GetUnknownMPTNodesBatch ( payload . MaxMPTHashesCount ) )
2021-07-30 13:57:42 +00:00
}
return nil
}
// requestHeaders sends a CMDGetHeaders message to the peer to sync up in headers.
func ( s * Server ) requestHeaders ( p Peer ) error {
2021-09-27 13:13:37 +00:00
pl := getRequestBlocksPayload ( p , s . chain . HeaderHeight ( ) , & s . lastRequestedHeader )
return p . EnqueueP2PMessage ( NewMessage ( CMDGetHeaders , pl ) )
2021-07-30 13:57:42 +00:00
}
2022-04-20 18:30:09 +00:00
// handlePing processes a pong request.
2020-01-17 10:17:19 +00:00
func ( s * Server ) handlePong ( p Peer , pong * payload . Ping ) error {
2020-01-20 16:02:19 +00:00
err := p . HandlePong ( pong )
if err != nil {
return err
2020-01-17 10:17:19 +00:00
}
2021-07-30 13:57:42 +00:00
return s . requestBlocksOrHeaders ( p )
2020-01-17 10:17:19 +00:00
}
2019-10-22 14:56:03 +00:00
// handleInvCmd processes the received inventory.
2018-03-14 09:36:59 +00:00
func ( s * Server ) handleInvCmd ( p Peer , inv * payload . Inventory ) error {
2019-12-02 08:02:52 +00:00
reqHashes := make ( [ ] util . Uint256 , 0 )
var typExists = map [ payload . InventoryType ] func ( util . Uint256 ) bool {
2021-08-03 19:28:16 +00:00
payload . TXType : s . mempool . ContainsKey ,
2019-12-02 08:02:52 +00:00
payload . BlockType : s . chain . HasBlock ,
2021-01-14 13:38:40 +00:00
payload . ExtensibleType : func ( h util . Uint256 ) bool {
2021-01-18 12:52:51 +00:00
cp := s . extensiblePool . Get ( h )
2019-12-02 08:02:52 +00:00
return cp != nil
} ,
2020-11-27 10:55:48 +00:00
payload . P2PNotaryRequestType : func ( h util . Uint256 ) bool {
return s . notaryRequestPool . ContainsKey ( h )
} ,
2019-12-02 08:02:52 +00:00
}
if exists := typExists [ inv . Type ] ; exists != nil {
for _ , hash := range inv . Hashes {
if ! exists ( hash ) {
reqHashes = append ( reqHashes , hash )
}
}
}
if len ( reqHashes ) > 0 {
2020-05-21 10:35:44 +00:00
msg := NewMessage ( CMDGetData , payload . NewInventory ( inv . Type , reqHashes ) )
2020-01-16 18:16:31 +00:00
pkt , err := msg . Bytes ( )
if err != nil {
return err
}
2021-01-14 13:38:40 +00:00
if inv . Type == payload . ExtensibleType {
network: rework broadcast logic
We have a number of queues for different purposes:
* regular broadcast queue
* direct p2p queue
* high-priority queue
And two basic egress scenarios:
* direct p2p messages (replies to requests in Server's handle* methods)
* broadcasted messages
Low priority broadcasted messages:
* transaction inventories
* block inventories
* notary inventories
* non-consensus extensibles
High-priority broadcasted messages:
* consensus extensibles
* getdata transaction requests from consensus process
* getaddr requests
P2P messages are a bit more complicated, most of the time they use p2p queue,
but extensible message requests/replies use HP queue.
Server's handle* code is run from Peer's handleIncoming, every peer has this
thread that handles incoming messages. When working with the peer it's
important to reply to requests and blocking this thread until we send (queue)
a reply is fine, if the peer is slow we just won't get anything new from
it. The queue used is irrelevant wrt this issue.
Broadcasted messages are radically different, we want them to be delivered to
many peers, but we don't care about specific ones. If it's delivered to 2/3 of
the peers we're fine, if it's delivered to more of them --- it's not an
issue. But doing this fairly is not an easy thing, current code tries performing
unblocked sends and if this doesn't yield enough results it then blocks (but
has a timeout, we can't wait indefinitely). But it does so in sequential
manner, once the peer is chosen the code will wait for it (and only it) until
timeout happens.
What can be done instead is an attempt to push the message to all of the peers
simultaneously (or close to that). If they all deliver --- OK, if some block
and wait then we can wait until _any_ of them pushes the message through (or
global timeout happens, we still can't wait forever). If we have enough
deliveries then we can cancel pending ones and it's again not an error if
these canceled threads still do their job.
This makes the system more dynamic and adds some substantial processing
overhead, but it's a networking code, any of this overhead is much lower than
the actual packet delivery time. It also allows to spread the load more
fairly, if there is any spare queue it'll get the packet and release the
broadcaster. On the next broadcast iteration another peer is more likely to be
chosen just because it didn't get a message previously (and had some time to
deliver already queued messages).
It works perfectly in tests, with optimal networking conditions we have much
better block times and TPS increases by 5-25%% depending on the scenario.
I'd go as far as to say that it fixes the original problem of #2678, because
in this particular scenario we have empty queues in ~100% of the cases and
this new logic will likely lead to 100% fan out in this case (cancelation just
won't happen fast enough). But when the load grows and there is some waiting
in the queue it will optimize out the slowest links.
2022-10-10 19:48:06 +00:00
return p . EnqueueHPPacket ( pkt )
2020-01-16 18:16:31 +00:00
}
2020-01-23 16:40:40 +00:00
return p . EnqueueP2PPacket ( pkt )
2019-12-02 08:02:52 +00:00
}
return nil
2018-03-09 15:55:25 +00:00
}
2018-02-01 08:00:42 +00:00
2020-06-19 12:03:40 +00:00
// handleMempoolCmd handles getmempool command.
func ( s * Server ) handleMempoolCmd ( p Peer ) error {
2021-08-03 19:28:16 +00:00
txs := s . mempool . GetVerifiedTransactions ( )
2020-06-19 12:03:40 +00:00
hs := make ( [ ] util . Uint256 , 0 , payload . MaxHashesCount )
for i := range txs {
hs = append ( hs , txs [ i ] . Hash ( ) )
if len ( hs ) < payload . MaxHashesCount && i != len ( txs ) - 1 {
continue
}
msg := NewMessage ( CMDInv , payload . NewInventory ( payload . TXType , hs ) )
err := p . EnqueueP2PMessage ( msg )
if err != nil {
return err
}
hs = hs [ : 0 ]
}
return nil
}
2019-10-24 07:18:30 +00:00
// handleInvCmd processes the received inventory.
func ( s * Server ) handleGetDataCmd ( p Peer , inv * payload . Inventory ) error {
2020-07-08 12:25:58 +00:00
var notFound [ ] util . Uint256
2020-01-16 18:16:31 +00:00
for _ , hash := range inv . Hashes {
var msg * Message
switch inv . Type {
case payload . TXType :
2019-10-24 07:18:30 +00:00
tx , _ , err := s . chain . GetTransaction ( hash )
if err == nil {
2020-05-21 10:35:44 +00:00
msg = NewMessage ( CMDTX , tx )
2020-07-08 12:25:58 +00:00
} else {
notFound = append ( notFound , hash )
2019-10-24 07:18:30 +00:00
}
2020-01-16 18:16:31 +00:00
case payload . BlockType :
2019-10-24 07:18:30 +00:00
b , err := s . chain . GetBlock ( hash )
if err == nil {
2020-05-21 10:35:44 +00:00
msg = NewMessage ( CMDBlock , b )
2020-07-08 12:25:58 +00:00
} else {
notFound = append ( notFound , hash )
2019-10-24 07:18:30 +00:00
}
2021-01-14 13:38:40 +00:00
case payload . ExtensibleType :
2021-01-18 12:52:51 +00:00
if cp := s . extensiblePool . Get ( hash ) ; cp != nil {
2021-01-14 13:38:40 +00:00
msg = NewMessage ( CMDExtensible , cp )
2020-01-16 18:16:31 +00:00
}
2020-11-27 10:55:48 +00:00
case payload . P2PNotaryRequestType :
if nrp , ok := s . notaryRequestPool . TryGetData ( hash ) ; ok { // already have checked P2PSigExtEnabled
msg = NewMessage ( CMDP2PNotaryRequest , nrp . ( * payload . P2PNotaryRequest ) )
} else {
notFound = append ( notFound , hash )
}
2020-01-16 18:16:31 +00:00
}
if msg != nil {
pkt , err := msg . Bytes ( )
2020-01-27 08:55:03 +00:00
if err == nil {
2021-01-14 13:38:40 +00:00
if inv . Type == payload . ExtensibleType {
network: rework broadcast logic
We have a number of queues for different purposes:
* regular broadcast queue
* direct p2p queue
* high-priority queue
And two basic egress scenarios:
* direct p2p messages (replies to requests in Server's handle* methods)
* broadcasted messages
Low priority broadcasted messages:
* transaction inventories
* block inventories
* notary inventories
* non-consensus extensibles
High-priority broadcasted messages:
* consensus extensibles
* getdata transaction requests from consensus process
* getaddr requests
P2P messages are a bit more complicated, most of the time they use p2p queue,
but extensible message requests/replies use HP queue.
Server's handle* code is run from Peer's handleIncoming, every peer has this
thread that handles incoming messages. When working with the peer it's
important to reply to requests and blocking this thread until we send (queue)
a reply is fine, if the peer is slow we just won't get anything new from
it. The queue used is irrelevant wrt this issue.
Broadcasted messages are radically different, we want them to be delivered to
many peers, but we don't care about specific ones. If it's delivered to 2/3 of
the peers we're fine, if it's delivered to more of them --- it's not an
issue. But doing this fairly is not an easy thing, current code tries performing
unblocked sends and if this doesn't yield enough results it then blocks (but
has a timeout, we can't wait indefinitely). But it does so in sequential
manner, once the peer is chosen the code will wait for it (and only it) until
timeout happens.
What can be done instead is an attempt to push the message to all of the peers
simultaneously (or close to that). If they all deliver --- OK, if some block
and wait then we can wait until _any_ of them pushes the message through (or
global timeout happens, we still can't wait forever). If we have enough
deliveries then we can cancel pending ones and it's again not an error if
these canceled threads still do their job.
This makes the system more dynamic and adds some substantial processing
overhead, but it's a networking code, any of this overhead is much lower than
the actual packet delivery time. It also allows to spread the load more
fairly, if there is any spare queue it'll get the packet and release the
broadcaster. On the next broadcast iteration another peer is more likely to be
chosen just because it didn't get a message previously (and had some time to
deliver already queued messages).
It works perfectly in tests, with optimal networking conditions we have much
better block times and TPS increases by 5-25%% depending on the scenario.
I'd go as far as to say that it fixes the original problem of #2678, because
in this particular scenario we have empty queues in ~100% of the cases and
this new logic will likely lead to 100% fan out in this case (cancelation just
won't happen fast enough). But when the load grows and there is some waiting
in the queue it will optimize out the slowest links.
2022-10-10 19:48:06 +00:00
err = p . EnqueueHPPacket ( pkt )
2020-01-27 08:55:03 +00:00
} else {
2020-01-23 16:40:40 +00:00
err = p . EnqueueP2PPacket ( pkt )
2020-01-27 08:55:03 +00:00
}
}
2020-01-16 18:16:31 +00:00
if err != nil {
return err
}
2019-11-08 15:40:21 +00:00
}
2019-10-24 07:18:30 +00:00
}
2020-07-08 12:25:58 +00:00
if len ( notFound ) != 0 {
return p . EnqueueP2PMessage ( NewMessage ( CMDNotFound , payload . NewInventory ( inv . Type , notFound ) ) )
}
2019-10-24 07:18:30 +00:00
return nil
}
2021-07-30 13:57:42 +00:00
// handleGetMPTDataCmd processes the received MPT inventory.
func ( s * Server ) handleGetMPTDataCmd ( p Peer , inv * payload . MPTInventory ) error {
2022-01-20 18:14:42 +00:00
if ! s . config . P2PStateExchangeExtensions {
2021-07-30 13:57:42 +00:00
return errors . New ( "GetMPTDataCMD was received, but P2PStateExchangeExtensions are disabled" )
}
2022-07-14 11:13:38 +00:00
// Even if s.config.KeepOnlyLatestState enabled, we'll keep latest P1 and P2 MPT states.
2021-07-30 13:57:42 +00:00
resp := payload . MPTData { }
capLeft := payload . MaxSize - 8 // max(io.GetVarSize(len(resp.Nodes)))
2021-09-06 12:16:47 +00:00
added := make ( map [ util . Uint256 ] struct { } )
2021-07-30 13:57:42 +00:00
for _ , h := range inv . Hashes {
if capLeft <= 2 { // at least 1 byte for len(nodeBytes) and 1 byte for node type
break
}
err := s . stateSync . Traverse ( h ,
2021-09-06 12:16:47 +00:00
func ( n mpt . Node , node [ ] byte ) bool {
if _ , ok := added [ n . Hash ( ) ] ; ok {
return false
}
2021-07-30 13:57:42 +00:00
l := len ( node )
size := l + io . GetVarSize ( l )
if size > capLeft {
return true
}
resp . Nodes = append ( resp . Nodes , node )
2021-09-06 12:16:47 +00:00
added [ n . Hash ( ) ] = struct { } { }
2021-07-30 13:57:42 +00:00
capLeft -= size
return false
} )
if err != nil {
return fmt . Errorf ( "failed to traverse MPT starting from %s: %w" , h . StringBE ( ) , err )
}
}
if len ( resp . Nodes ) > 0 {
msg := NewMessage ( CMDMPTData , & resp )
return p . EnqueueP2PMessage ( msg )
}
return nil
}
func ( s * Server ) handleMPTDataCmd ( p Peer , data * payload . MPTData ) error {
2022-01-20 18:14:42 +00:00
if ! s . config . P2PStateExchangeExtensions {
2021-07-30 13:57:42 +00:00
return errors . New ( "MPTDataCMD was received, but P2PStateExchangeExtensions are disabled" )
}
return s . stateSync . AddMPTNodes ( data . Nodes )
}
2022-04-20 18:30:09 +00:00
// requestMPTNodes requests the specified MPT nodes from the peer or broadcasts
// request if no peer is specified.
2021-08-13 09:46:23 +00:00
func ( s * Server ) requestMPTNodes ( p Peer , itms [ ] util . Uint256 ) error {
if len ( itms ) == 0 {
return nil
}
if len ( itms ) > payload . MaxMPTHashesCount {
itms = itms [ : payload . MaxMPTHashesCount ]
}
pl := payload . NewMPTInventory ( itms )
msg := NewMessage ( CMDGetMPTData , pl )
return p . EnqueueP2PMessage ( msg )
}
2019-12-25 16:40:18 +00:00
// handleGetBlocksCmd processes the getblocks request.
func ( s * Server ) handleGetBlocksCmd ( p Peer , gb * payload . GetBlocks ) error {
2020-05-22 14:30:56 +00:00
count := gb . Count
if gb . Count < 0 || gb . Count > payload . MaxHashesCount {
count = payload . MaxHashesCount
2019-12-25 16:40:18 +00:00
}
2020-05-22 14:30:56 +00:00
start , err := s . chain . GetHeader ( gb . HashStart )
2019-12-25 16:40:18 +00:00
if err != nil {
return err
}
blockHashes := make ( [ ] util . Uint256 , 0 )
2020-12-07 15:40:04 +00:00
for i := start . Index + 1 ; i <= start . Index + uint32 ( count ) ; i ++ {
2019-12-25 16:40:18 +00:00
hash := s . chain . GetHeaderHash ( int ( i ) )
2020-05-22 14:30:56 +00:00
if hash . Equals ( util . Uint256 { } ) {
2019-12-25 16:40:18 +00:00
break
}
blockHashes = append ( blockHashes , hash )
}
if len ( blockHashes ) == 0 {
return nil
}
payload := payload . NewInventory ( payload . BlockType , blockHashes )
2020-05-21 10:35:44 +00:00
msg := NewMessage ( CMDInv , payload )
2020-01-23 16:40:40 +00:00
return p . EnqueueP2PMessage ( msg )
2019-12-25 16:40:18 +00:00
}
2020-07-31 10:58:22 +00:00
// handleGetBlockByIndexCmd processes the getblockbyindex request.
func ( s * Server ) handleGetBlockByIndexCmd ( p Peer , gbd * payload . GetBlockByIndex ) error {
2020-07-31 11:17:14 +00:00
count := gbd . Count
if gbd . Count < 0 || gbd . Count > payload . MaxHashesCount {
count = payload . MaxHashesCount
}
for i := gbd . IndexStart ; i < gbd . IndexStart + uint32 ( count ) ; i ++ {
2020-07-31 11:51:51 +00:00
hash := s . chain . GetHeaderHash ( int ( i ) )
if hash . Equals ( util . Uint256 { } ) {
break
}
b , err := s . chain . GetBlock ( hash )
2020-05-22 12:43:46 +00:00
if err != nil {
2020-07-31 11:51:51 +00:00
break
2020-05-22 12:43:46 +00:00
}
msg := NewMessage ( CMDBlock , b )
2020-07-31 11:51:51 +00:00
if err = p . EnqueueP2PMessage ( msg ) ; err != nil {
return err
}
2020-05-22 12:43:46 +00:00
}
return nil
}
2019-11-29 08:08:22 +00:00
// handleGetHeadersCmd processes the getheaders request.
2020-07-31 11:47:42 +00:00
func ( s * Server ) handleGetHeadersCmd ( p Peer , gh * payload . GetBlockByIndex ) error {
if gh . IndexStart > s . chain . HeaderHeight ( ) {
return nil
2019-11-29 08:08:22 +00:00
}
2020-07-31 11:47:42 +00:00
count := gh . Count
if gh . Count < 0 || gh . Count > payload . MaxHeadersAllowed {
count = payload . MaxHeadersAllowed
2019-11-29 08:08:22 +00:00
}
resp := payload . Headers { }
2020-07-31 11:47:42 +00:00
resp . Hdrs = make ( [ ] * block . Header , 0 , count )
for i := gh . IndexStart ; i < gh . IndexStart + uint32 ( count ) ; i ++ {
2019-11-29 08:08:22 +00:00
hash := s . chain . GetHeaderHash ( int ( i ) )
2020-05-22 14:30:56 +00:00
if hash . Equals ( util . Uint256 { } ) {
2019-11-29 08:08:22 +00:00
break
}
header , err := s . chain . GetHeader ( hash )
if err != nil {
break
}
resp . Hdrs = append ( resp . Hdrs , header )
}
if len ( resp . Hdrs ) == 0 {
return nil
}
2020-05-21 10:35:44 +00:00
msg := NewMessage ( CMDHeaders , & resp )
2020-01-23 16:40:40 +00:00
return p . EnqueueP2PMessage ( msg )
2019-11-29 08:08:22 +00:00
}
2021-07-30 13:57:42 +00:00
// handleHeadersCmd processes headers payload.
func ( s * Server ) handleHeadersCmd ( p Peer , h * payload . Headers ) error {
return s . stateSync . AddHeaders ( h . Hdrs ... )
}
2022-04-20 18:30:09 +00:00
// handleExtensibleCmd processes the received extensible payload.
2021-01-14 13:38:40 +00:00
func ( s * Server ) handleExtensibleCmd ( e * payload . Extensible ) error {
2021-04-02 09:55:56 +00:00
if ! s . syncReached . Load ( ) {
return nil
2021-02-05 11:54:43 +00:00
}
2021-01-18 12:52:51 +00:00
ok , err := s . extensiblePool . Add ( e )
if err != nil {
2021-01-14 13:38:40 +00:00
return err
}
2021-01-18 12:52:51 +00:00
if ! ok { // payload is already in cache
return nil
2021-01-14 13:38:40 +00:00
}
2022-07-27 08:25:58 +00:00
s . serviceLock . RLock ( )
2022-01-12 18:09:37 +00:00
handler := s . extensHandlers [ e . Category ]
2022-07-27 08:25:58 +00:00
s . serviceLock . RUnlock ( )
2022-01-12 18:09:37 +00:00
if handler != nil {
err = handler ( e )
2021-02-01 16:00:07 +00:00
if err != nil {
return err
}
2021-01-14 13:38:40 +00:00
}
2022-01-12 18:09:37 +00:00
s . advertiseExtensible ( e )
return nil
}
2021-01-18 12:52:51 +00:00
2022-01-12 18:09:37 +00:00
func ( s * Server ) advertiseExtensible ( e * payload . Extensible ) {
2021-01-18 12:52:51 +00:00
msg := NewMessage ( CMDInv , payload . NewInventory ( payload . ExtensibleType , [ ] util . Uint256 { e . Hash ( ) } ) )
2022-07-28 15:30:14 +00:00
if e . Category == payload . ConsensusCategory {
2022-01-12 18:09:37 +00:00
// It's high priority because it directly affects consensus process,
// even though it's just an inv.
2021-01-18 12:52:51 +00:00
s . broadcastHPMessage ( msg )
} else {
s . broadcastMessage ( msg )
}
2019-11-08 15:40:21 +00:00
}
2022-04-20 18:30:09 +00:00
// handleTxCmd processes the received transaction.
2019-11-15 10:32:40 +00:00
// It never returns an error.
func ( s * Server ) handleTxCmd ( tx * transaction . Transaction ) error {
2019-11-29 08:09:54 +00:00
// It's OK for it to fail for various reasons like tx already existing
// in the pool.
2021-08-03 19:43:31 +00:00
s . txInLock . Lock ( )
_ , ok := s . txInMap [ tx . Hash ( ) ]
if ok || s . mempool . ContainsKey ( tx . Hash ( ) ) {
s . txInLock . Unlock ( )
return nil
}
s . txInMap [ tx . Hash ( ) ] = struct { } { }
s . txInLock . Unlock ( )
2022-07-27 08:25:58 +00:00
s . serviceLock . RLock ( )
txCallback := s . txCallback
s . serviceLock . RUnlock ( )
if txCallback != nil {
txCallback ( tx )
2022-01-14 16:51:04 +00:00
}
2021-02-17 11:51:54 +00:00
if s . verifyAndPoolTX ( tx ) == nil {
2020-11-27 10:55:48 +00:00
s . broadcastTX ( tx , nil )
}
2021-08-03 19:43:31 +00:00
s . txInLock . Lock ( )
delete ( s . txInMap , tx . Hash ( ) )
s . txInLock . Unlock ( )
2020-11-27 10:55:48 +00:00
return nil
}
2022-04-20 18:30:09 +00:00
// handleP2PNotaryRequestCmd process the received P2PNotaryRequest payload.
2020-11-27 10:55:48 +00:00
func ( s * Server ) handleP2PNotaryRequestCmd ( r * payload . P2PNotaryRequest ) error {
if ! s . chain . P2PSigExtensionsEnabled ( ) {
return errors . New ( "P2PNotaryRequestCMD was received, but P2PSignatureExtensions are disabled" )
}
2021-02-17 11:51:54 +00:00
// It's OK for it to fail for various reasons like request already existing
// in the pool.
2021-05-12 17:14:35 +00:00
_ = s . RelayP2PNotaryRequest ( r )
2021-02-08 08:48:28 +00:00
return nil
}
2022-04-20 18:30:09 +00:00
// RelayP2PNotaryRequest adds the given request to the pool and relays. It does not check
2021-02-08 08:48:28 +00:00
// P2PSigExtensions enabled.
2021-02-17 11:51:54 +00:00
func ( s * Server ) RelayP2PNotaryRequest ( r * payload . P2PNotaryRequest ) error {
err := s . verifyAndPoolNotaryRequest ( r )
if err == nil {
2020-11-27 10:55:48 +00:00
s . broadcastP2PNotaryRequestPayload ( nil , r )
}
2021-02-17 11:51:54 +00:00
return err
2020-11-27 10:55:48 +00:00
}
// verifyAndPoolNotaryRequest verifies NotaryRequest payload and adds it to the payload mempool.
2021-02-17 11:51:54 +00:00
func ( s * Server ) verifyAndPoolNotaryRequest ( r * payload . P2PNotaryRequest ) error {
2022-01-14 01:09:54 +00:00
return s . chain . PoolTxWithData ( r . FallbackTransaction , r , s . notaryRequestPool , s . notaryFeer , s . verifyNotaryRequest )
2020-11-27 10:55:48 +00:00
}
// verifyNotaryRequest is a function for state-dependant P2PNotaryRequest payload verification which is executed before ordinary blockchain's verification.
2022-01-14 01:09:54 +00:00
func ( s * Server ) verifyNotaryRequest ( _ * transaction . Transaction , data interface { } ) error {
2020-11-27 10:55:48 +00:00
r := data . ( * payload . P2PNotaryRequest )
payer := r . FallbackTransaction . Signers [ 1 ] . Account
2022-01-14 01:09:54 +00:00
if _ , err := s . chain . VerifyWitness ( payer , r , & r . Witness , s . chain . GetMaxVerificationGAS ( ) ) ; err != nil {
2020-11-27 10:55:48 +00:00
return fmt . Errorf ( "bad P2PNotaryRequest payload witness: %w" , err )
}
2022-01-14 01:09:54 +00:00
notaryHash := s . chain . GetNotaryContractScriptHash ( )
2020-12-30 08:01:13 +00:00
if r . FallbackTransaction . Sender ( ) != notaryHash {
2020-11-27 10:55:48 +00:00
return errors . New ( "P2PNotary contract should be a sender of the fallback transaction" )
}
2022-01-14 01:09:54 +00:00
depositExpiration := s . chain . GetNotaryDepositExpiration ( payer )
2020-11-27 10:55:48 +00:00
if r . FallbackTransaction . ValidUntilBlock >= depositExpiration {
return fmt . Errorf ( "fallback transaction is valid after deposit is unlocked: ValidUntilBlock is %d, deposit lock expires at %d" , r . FallbackTransaction . ValidUntilBlock , depositExpiration )
2020-01-29 08:56:40 +00:00
}
2019-11-15 10:32:40 +00:00
return nil
}
2020-11-27 10:55:48 +00:00
func ( s * Server ) broadcastP2PNotaryRequestPayload ( _ * transaction . Transaction , data interface { } ) {
2021-02-08 15:58:10 +00:00
r := data . ( * payload . P2PNotaryRequest ) // we can guarantee that cast is successful
2020-11-27 10:55:48 +00:00
msg := NewMessage ( CMDInv , payload . NewInventory ( payload . P2PNotaryRequestType , [ ] util . Uint256 { r . FallbackTransaction . Hash ( ) } ) )
s . broadcastMessage ( msg )
}
2022-04-20 18:30:09 +00:00
// handleAddrCmd will process the received addresses.
2019-09-13 09:03:07 +00:00
func ( s * Server ) handleAddrCmd ( p Peer , addrs * payload . AddressList ) error {
2020-11-25 10:34:38 +00:00
if ! p . CanProcessAddr ( ) {
return errors . New ( "unexpected addr received" )
}
2021-03-26 09:31:07 +00:00
dups := make ( map [ string ] bool )
2019-09-13 09:03:07 +00:00
for _ , a := range addrs . Addrs {
2020-05-22 09:59:18 +00:00
addr , err := a . GetTCPAddress ( )
2021-03-26 09:31:07 +00:00
if err == nil && ! dups [ addr ] {
dups [ addr ] = true
2020-05-22 09:59:18 +00:00
s . discovery . BackFill ( addr )
}
2019-09-13 09:03:07 +00:00
}
return nil
}
2019-09-13 17:38:34 +00:00
// handleGetAddrCmd sends to the peer some good addresses that we know of.
func ( s * Server ) handleGetAddrCmd ( p Peer ) error {
addrs := s . discovery . GoodPeers ( )
2020-10-07 20:29:20 +00:00
if len ( addrs ) > payload . MaxAddrsCount {
addrs = addrs [ : payload . MaxAddrsCount ]
2019-09-13 17:38:34 +00:00
}
alist := payload . NewAddressList ( len ( addrs ) )
ts := time . Now ( )
for i , addr := range addrs {
// we know it's a good address, so it can't fail
2020-05-22 09:59:18 +00:00
netaddr , _ := net . ResolveTCPAddr ( "tcp" , addr . Address )
alist . Addrs [ i ] = payload . NewAddressAndTime ( netaddr , ts , addr . Capabilities )
2019-09-13 17:38:34 +00:00
}
2020-05-21 10:35:44 +00:00
return p . EnqueueP2PMessage ( NewMessage ( CMDAddr , alist ) )
2019-09-13 17:38:34 +00:00
}
2020-07-31 14:12:13 +00:00
// requestBlocks sends a CMDGetBlockByIndex message to the peer
2022-04-20 18:30:09 +00:00
// to sync up in blocks. A maximum of maxBlockBatch will be
// sent at once. There are two things we need to take care of:
2022-08-08 10:23:21 +00:00
// 1. If possible, blocks should be fetched in parallel.
// height..+500 to one peer, height+500..+1000 to another etc.
// 2. Every block must eventually be fetched even if the peer sends no answer.
//
2022-04-20 18:30:09 +00:00
// Thus, the following algorithm is used:
2020-11-26 15:53:10 +00:00
// 1. Block range is divided into chunks of payload.MaxHashesCount.
// 2. Send requests for chunk in increasing order.
2022-04-20 18:30:09 +00:00
// 3. After all requests have been sent, request random height.
2022-01-14 01:09:54 +00:00
func ( s * Server ) requestBlocks ( bq Blockqueuer , p Peer ) error {
2022-01-17 21:04:41 +00:00
h := bq . BlockHeight ( )
pl := getRequestBlocksPayload ( p , h , & s . lastRequestedBlock )
lq := s . bQueue . lastQueued ( )
if lq > pl . IndexStart {
c := int16 ( h + blockCacheSize - lq )
if c < payload . MaxHashesCount {
pl . Count = c
}
pl . IndexStart = lq + 1
}
2021-09-27 13:13:37 +00:00
return p . EnqueueP2PMessage ( NewMessage ( CMDGetBlockByIndex , pl ) )
}
func getRequestBlocksPayload ( p Peer , currHeight uint32 , lastRequestedHeight * atomic . Uint32 ) * payload . GetBlockByIndex {
2020-11-26 15:53:10 +00:00
var peerHeight = p . LastBlockIndex ( )
var needHeight uint32
2021-09-27 13:13:37 +00:00
// lastRequestedBlock can only be increased.
2020-11-26 15:53:10 +00:00
for {
2021-09-27 13:13:37 +00:00
old := lastRequestedHeight . Load ( )
2020-11-26 15:53:10 +00:00
if old <= currHeight {
needHeight = currHeight + 1
2021-09-27 13:13:37 +00:00
if ! lastRequestedHeight . CAS ( old , needHeight ) {
2020-11-26 15:53:10 +00:00
continue
}
} else if old < currHeight + ( blockCacheSize - payload . MaxHashesCount ) {
needHeight = currHeight + 1
if peerHeight > old + payload . MaxHashesCount {
needHeight = old + payload . MaxHashesCount
2021-09-27 13:13:37 +00:00
if ! lastRequestedHeight . CAS ( old , needHeight ) {
2020-11-26 15:53:10 +00:00
continue
}
}
} else {
index := mrand . Intn ( blockCacheSize / payload . MaxHashesCount )
needHeight = currHeight + 1 + uint32 ( index * payload . MaxHashesCount )
}
break
}
2021-09-27 13:13:37 +00:00
return payload . NewGetBlockByIndex ( needHeight , - 1 )
2018-02-01 08:00:42 +00:00
}
2019-10-22 14:56:03 +00:00
// handleMessage processes the given message.
2018-04-13 10:14:08 +00:00
func ( s * Server ) handleMessage ( peer Peer , msg * Message ) error {
2020-01-28 13:40:38 +00:00
s . log . Debug ( "got msg" ,
zap . Stringer ( "addr" , peer . RemoteAddr ( ) ) ,
2020-05-19 11:54:51 +00:00
zap . String ( "type" , msg . Command . String ( ) ) )
2020-01-28 13:40:38 +00:00
2019-09-13 12:43:22 +00:00
if peer . Handshaked ( ) {
2019-10-24 10:10:10 +00:00
if inv , ok := msg . Payload . ( * payload . Inventory ) ; ok {
2020-11-27 10:55:48 +00:00
if ! inv . Type . Valid ( s . chain . P2PSigExtensionsEnabled ( ) ) || len ( inv . Hashes ) == 0 {
2019-10-24 10:10:10 +00:00
return errInvalidInvType
}
}
2020-05-19 11:54:51 +00:00
switch msg . Command {
2019-09-13 12:43:22 +00:00
case CMDAddr :
addrs := msg . Payload . ( * payload . AddressList )
return s . handleAddrCmd ( peer , addrs )
2019-09-13 17:38:34 +00:00
case CMDGetAddr :
// it has no payload
return s . handleGetAddrCmd ( peer )
2019-12-25 16:40:18 +00:00
case CMDGetBlocks :
gb := msg . Payload . ( * payload . GetBlocks )
return s . handleGetBlocksCmd ( peer , gb )
2020-07-31 10:58:22 +00:00
case CMDGetBlockByIndex :
gbd := msg . Payload . ( * payload . GetBlockByIndex )
return s . handleGetBlockByIndexCmd ( peer , gbd )
2019-10-24 07:18:30 +00:00
case CMDGetData :
inv := msg . Payload . ( * payload . Inventory )
return s . handleGetDataCmd ( peer , inv )
2021-07-30 13:57:42 +00:00
case CMDGetMPTData :
inv := msg . Payload . ( * payload . MPTInventory )
return s . handleGetMPTDataCmd ( peer , inv )
case CMDMPTData :
inv := msg . Payload . ( * payload . MPTData )
return s . handleMPTDataCmd ( peer , inv )
2019-11-29 08:08:22 +00:00
case CMDGetHeaders :
2020-07-31 11:47:42 +00:00
gh := msg . Payload . ( * payload . GetBlockByIndex )
2019-11-29 08:08:22 +00:00
return s . handleGetHeadersCmd ( peer , gh )
2021-07-30 13:57:42 +00:00
case CMDHeaders :
h := msg . Payload . ( * payload . Headers )
return s . handleHeadersCmd ( peer , h )
2019-09-13 12:43:22 +00:00
case CMDInv :
inventory := msg . Payload . ( * payload . Inventory )
return s . handleInvCmd ( peer , inventory )
2020-06-19 12:03:40 +00:00
case CMDMempool :
// no payload
return s . handleMempoolCmd ( peer )
2019-09-13 12:43:22 +00:00
case CMDBlock :
2020-01-14 12:32:07 +00:00
block := msg . Payload . ( * block . Block )
2019-09-13 12:43:22 +00:00
return s . handleBlockCmd ( peer , block )
2021-01-14 13:38:40 +00:00
case CMDExtensible :
cp := msg . Payload . ( * payload . Extensible )
return s . handleExtensibleCmd ( cp )
2019-11-15 10:32:40 +00:00
case CMDTX :
tx := msg . Payload . ( * transaction . Transaction )
return s . handleTxCmd ( tx )
2020-11-27 10:55:48 +00:00
case CMDP2PNotaryRequest :
r := msg . Payload . ( * payload . P2PNotaryRequest )
return s . handleP2PNotaryRequestCmd ( r )
2020-01-17 10:17:19 +00:00
case CMDPing :
ping := msg . Payload . ( * payload . Ping )
return s . handlePing ( peer , ping )
case CMDPong :
pong := msg . Payload . ( * payload . Ping )
return s . handlePong ( peer , pong )
2019-09-13 12:43:22 +00:00
case CMDVersion , CMDVerack :
2020-05-19 11:54:51 +00:00
return fmt . Errorf ( "received '%s' after the handshake" , msg . Command . String ( ) )
2019-09-13 12:43:22 +00:00
}
} else {
2020-05-19 11:54:51 +00:00
switch msg . Command {
2019-09-13 12:43:22 +00:00
case CMDVersion :
version := msg . Payload . ( * payload . Version )
return s . handleVersionCmd ( peer , version )
case CMDVerack :
err := peer . HandleVersionAck ( )
if err != nil {
return err
}
2020-01-15 14:03:42 +00:00
go peer . StartProtocol ( )
2019-11-15 10:32:40 +00:00
2021-07-30 13:57:42 +00:00
s . tryInitStateSync ( )
2021-04-02 09:55:56 +00:00
s . tryStartServices ( )
2019-09-13 12:43:22 +00:00
default :
2020-05-19 11:54:51 +00:00
return fmt . Errorf ( "received '%s' during handshake" , msg . Command . String ( ) )
2018-03-14 09:36:59 +00:00
}
}
return nil
2018-01-26 18:04:13 +00:00
}
Implement rpc server method: sendrawtransaction (#174)
* Added new config attributes: 'SecondsPerBlock','LowPriorityThreshold'
* Added new files:
* Added new method: CompareTo
* Fixed empty Slice case
* Added new methods: LessThan, GreaterThan, Equal, CompareTo
* Added new method: InputIntersection
* Added MaxTransactionSize, GroupOutputByAssetID
* Added ned method: ScriptHash
* Added new method: IsDoubleSpend
* Refactor blockchainer, Added Feer interface, Verify and GetMemPool method
* 1) Added MemPool
2) Added new methods to satisfy the blockchainer interface: IsLowPriority, Verify, GetMemPool
* Added new methods: RelayTxn, RelayDirectly
* Fixed tests
* Implemented RPC server method sendrawtransaction
* Refactor getrawtransaction, sendrawtransaction in separate methods
* Moved 'secondsPerBlock' to config file
* Implemented Kim suggestions:
1) Fixed data race issues
2) refactor Verify method
3) Get rid of unused InputIntersection method due to refactoring Verify method
4) Fixed bug in https://github.com/CityOfZion/neo-go/pull/174#discussion_r264108135
5) minor simplications of the code
* Fixed minor issues related to
1) space
2) getter methods do not need pointer on the receiver
3) error message
4) refactoring CompareTo method in uint256.go
* Fixed small issues
* Use sync.RWMutex instead of sync.Mutex
* Refined (R)Lock/(R)Unlock
* return error instead of bool in Verify methods
2019-03-20 12:30:05 +00:00
2021-07-30 13:57:42 +00:00
func ( s * Server ) tryInitStateSync ( ) {
if ! s . stateSync . IsActive ( ) {
s . bSyncQueue . discard ( )
return
}
if s . stateSync . IsInitialized ( ) {
return
}
var peersNumber int
s . lock . RLock ( )
heights := make ( [ ] uint32 , 0 )
for p := range s . peers {
if p . Handshaked ( ) {
peersNumber ++
peerLastBlock := p . LastBlockIndex ( )
i := sort . Search ( len ( heights ) , func ( i int ) bool {
return heights [ i ] >= peerLastBlock
} )
heights = append ( heights , peerLastBlock )
if i != len ( heights ) - 1 {
copy ( heights [ i + 1 : ] , heights [ i : ] )
heights [ i ] = peerLastBlock
}
}
}
s . lock . RUnlock ( )
if peersNumber >= s . MinPeers && len ( heights ) > 0 {
2022-04-20 18:30:09 +00:00
// choose the height of the median peer as the current chain's height
2021-07-30 13:57:42 +00:00
h := heights [ len ( heights ) / 2 ]
err := s . stateSync . Init ( h )
if err != nil {
s . log . Fatal ( "failed to init state sync module" ,
zap . Uint32 ( "evaluated chain's blockHeight" , h ) ,
zap . Uint32 ( "blockHeight" , s . chain . BlockHeight ( ) ) ,
zap . Uint32 ( "headerHeight" , s . chain . HeaderHeight ( ) ) ,
zap . Error ( err ) )
}
// module can be inactive after init (i.e. full state is collected and ordinary block processing is needed)
if ! s . stateSync . IsActive ( ) {
s . bSyncQueue . discard ( )
}
}
}
2022-01-12 18:09:37 +00:00
2022-04-20 18:30:09 +00:00
// BroadcastExtensible add a locally-generated Extensible payload to the pool
2022-01-12 18:09:37 +00:00
// and advertises it to peers.
func ( s * Server ) BroadcastExtensible ( p * payload . Extensible ) {
2021-01-18 12:52:51 +00:00
_ , err := s . extensiblePool . Add ( p )
if err != nil {
s . log . Error ( "created payload is not valid" , zap . Error ( err ) )
return
}
2022-01-12 18:09:37 +00:00
s . advertiseExtensible ( p )
2019-11-15 10:32:40 +00:00
}
2022-04-20 18:30:09 +00:00
// RequestTx asks for the given transactions from Server peers using GetData message.
2022-01-12 20:04:07 +00:00
func ( s * Server ) RequestTx ( hashes ... util . Uint256 ) {
2019-11-15 10:32:40 +00:00
if len ( hashes ) == 0 {
return
}
2020-12-07 15:40:36 +00:00
for i := 0 ; i <= len ( hashes ) / payload . MaxHashesCount ; i ++ {
2020-09-04 13:04:40 +00:00
start := i * payload . MaxHashesCount
stop := ( i + 1 ) * payload . MaxHashesCount
2020-12-07 15:40:36 +00:00
if stop > len ( hashes ) {
2020-09-04 13:04:40 +00:00
stop = len ( hashes )
}
2020-12-07 15:40:36 +00:00
if start == stop {
break
}
2020-09-04 13:04:40 +00:00
msg := NewMessage ( CMDGetData , payload . NewInventory ( payload . TXType , hashes [ start : stop ] ) )
// It's high priority because it directly affects consensus process,
// even though it's getdata.
s . broadcastHPMessage ( msg )
}
2019-11-15 10:32:40 +00:00
}
2022-04-20 18:30:09 +00:00
// iteratePeersWithSendMsg sends the given message to all peers using two functions
2020-01-22 08:01:13 +00:00
// passed, one is to send the message and the other is to filtrate peers (the
// peer is considered invalid if it returns false).
network: rework broadcast logic
We have a number of queues for different purposes:
* regular broadcast queue
* direct p2p queue
* high-priority queue
And two basic egress scenarios:
* direct p2p messages (replies to requests in Server's handle* methods)
* broadcasted messages
Low priority broadcasted messages:
* transaction inventories
* block inventories
* notary inventories
* non-consensus extensibles
High-priority broadcasted messages:
* consensus extensibles
* getdata transaction requests from consensus process
* getaddr requests
P2P messages are a bit more complicated, most of the time they use p2p queue,
but extensible message requests/replies use HP queue.
Server's handle* code is run from Peer's handleIncoming, every peer has this
thread that handles incoming messages. When working with the peer it's
important to reply to requests and blocking this thread until we send (queue)
a reply is fine, if the peer is slow we just won't get anything new from
it. The queue used is irrelevant wrt this issue.
Broadcasted messages are radically different, we want them to be delivered to
many peers, but we don't care about specific ones. If it's delivered to 2/3 of
the peers we're fine, if it's delivered to more of them --- it's not an
issue. But doing this fairly is not an easy thing, current code tries performing
unblocked sends and if this doesn't yield enough results it then blocks (but
has a timeout, we can't wait indefinitely). But it does so in sequential
manner, once the peer is chosen the code will wait for it (and only it) until
timeout happens.
What can be done instead is an attempt to push the message to all of the peers
simultaneously (or close to that). If they all deliver --- OK, if some block
and wait then we can wait until _any_ of them pushes the message through (or
global timeout happens, we still can't wait forever). If we have enough
deliveries then we can cancel pending ones and it's again not an error if
these canceled threads still do their job.
This makes the system more dynamic and adds some substantial processing
overhead, but it's a networking code, any of this overhead is much lower than
the actual packet delivery time. It also allows to spread the load more
fairly, if there is any spare queue it'll get the packet and release the
broadcaster. On the next broadcast iteration another peer is more likely to be
chosen just because it didn't get a message previously (and had some time to
deliver already queued messages).
It works perfectly in tests, with optimal networking conditions we have much
better block times and TPS increases by 5-25%% depending on the scenario.
I'd go as far as to say that it fixes the original problem of #2678, because
in this particular scenario we have empty queues in ~100% of the cases and
this new logic will likely lead to 100% fan out in this case (cancelation just
won't happen fast enough). But when the load grows and there is some waiting
in the queue it will optimize out the slowest links.
2022-10-10 19:48:06 +00:00
func ( s * Server ) iteratePeersWithSendMsg ( msg * Message , send func ( Peer , context . Context , [ ] byte ) error , peerOK func ( Peer ) bool ) {
2021-08-06 12:25:41 +00:00
var deadN , peerN , sentN int
2020-08-31 16:07:28 +00:00
// Get a copy of s.peers to avoid holding a lock while sending.
2021-08-06 12:04:13 +00:00
peers := s . getPeers ( peerOK )
2021-08-06 12:25:41 +00:00
peerN = len ( peers )
if peerN == 0 {
2020-08-31 16:07:28 +00:00
return
}
2020-01-22 08:01:13 +00:00
pkt , err := msg . Bytes ( )
if err != nil {
return
}
2020-12-22 12:55:55 +00:00
network: rework broadcast logic
We have a number of queues for different purposes:
* regular broadcast queue
* direct p2p queue
* high-priority queue
And two basic egress scenarios:
* direct p2p messages (replies to requests in Server's handle* methods)
* broadcasted messages
Low priority broadcasted messages:
* transaction inventories
* block inventories
* notary inventories
* non-consensus extensibles
High-priority broadcasted messages:
* consensus extensibles
* getdata transaction requests from consensus process
* getaddr requests
P2P messages are a bit more complicated, most of the time they use p2p queue,
but extensible message requests/replies use HP queue.
Server's handle* code is run from Peer's handleIncoming, every peer has this
thread that handles incoming messages. When working with the peer it's
important to reply to requests and blocking this thread until we send (queue)
a reply is fine, if the peer is slow we just won't get anything new from
it. The queue used is irrelevant wrt this issue.
Broadcasted messages are radically different, we want them to be delivered to
many peers, but we don't care about specific ones. If it's delivered to 2/3 of
the peers we're fine, if it's delivered to more of them --- it's not an
issue. But doing this fairly is not an easy thing, current code tries performing
unblocked sends and if this doesn't yield enough results it then blocks (but
has a timeout, we can't wait indefinitely). But it does so in sequential
manner, once the peer is chosen the code will wait for it (and only it) until
timeout happens.
What can be done instead is an attempt to push the message to all of the peers
simultaneously (or close to that). If they all deliver --- OK, if some block
and wait then we can wait until _any_ of them pushes the message through (or
global timeout happens, we still can't wait forever). If we have enough
deliveries then we can cancel pending ones and it's again not an error if
these canceled threads still do their job.
This makes the system more dynamic and adds some substantial processing
overhead, but it's a networking code, any of this overhead is much lower than
the actual packet delivery time. It also allows to spread the load more
fairly, if there is any spare queue it'll get the packet and release the
broadcaster. On the next broadcast iteration another peer is more likely to be
chosen just because it didn't get a message previously (and had some time to
deliver already queued messages).
It works perfectly in tests, with optimal networking conditions we have much
better block times and TPS increases by 5-25%% depending on the scenario.
I'd go as far as to say that it fixes the original problem of #2678, because
in this particular scenario we have empty queues in ~100% of the cases and
this new logic will likely lead to 100% fan out in this case (cancelation just
won't happen fast enough). But when the load grows and there is some waiting
in the queue it will optimize out the slowest links.
2022-10-10 19:48:06 +00:00
var replies = make ( chan error , peerN ) // Cache is there just to make goroutines exit faster.
var ctx , cancel = context . WithTimeout ( context . Background ( ) , s . TimePerBlock / 2 )
for _ , peer := range peers {
go func ( p Peer , ctx context . Context , pkt [ ] byte ) {
2022-10-11 14:44:13 +00:00
// Do this before packet is sent, reader thread can get the reply before this routine wakes up.
if msg . Command == CMDGetAddr {
network: rework broadcast logic
We have a number of queues for different purposes:
* regular broadcast queue
* direct p2p queue
* high-priority queue
And two basic egress scenarios:
* direct p2p messages (replies to requests in Server's handle* methods)
* broadcasted messages
Low priority broadcasted messages:
* transaction inventories
* block inventories
* notary inventories
* non-consensus extensibles
High-priority broadcasted messages:
* consensus extensibles
* getdata transaction requests from consensus process
* getaddr requests
P2P messages are a bit more complicated, most of the time they use p2p queue,
but extensible message requests/replies use HP queue.
Server's handle* code is run from Peer's handleIncoming, every peer has this
thread that handles incoming messages. When working with the peer it's
important to reply to requests and blocking this thread until we send (queue)
a reply is fine, if the peer is slow we just won't get anything new from
it. The queue used is irrelevant wrt this issue.
Broadcasted messages are radically different, we want them to be delivered to
many peers, but we don't care about specific ones. If it's delivered to 2/3 of
the peers we're fine, if it's delivered to more of them --- it's not an
issue. But doing this fairly is not an easy thing, current code tries performing
unblocked sends and if this doesn't yield enough results it then blocks (but
has a timeout, we can't wait indefinitely). But it does so in sequential
manner, once the peer is chosen the code will wait for it (and only it) until
timeout happens.
What can be done instead is an attempt to push the message to all of the peers
simultaneously (or close to that). If they all deliver --- OK, if some block
and wait then we can wait until _any_ of them pushes the message through (or
global timeout happens, we still can't wait forever). If we have enough
deliveries then we can cancel pending ones and it's again not an error if
these canceled threads still do their job.
This makes the system more dynamic and adds some substantial processing
overhead, but it's a networking code, any of this overhead is much lower than
the actual packet delivery time. It also allows to spread the load more
fairly, if there is any spare queue it'll get the packet and release the
broadcaster. On the next broadcast iteration another peer is more likely to be
chosen just because it didn't get a message previously (and had some time to
deliver already queued messages).
It works perfectly in tests, with optimal networking conditions we have much
better block times and TPS increases by 5-25%% depending on the scenario.
I'd go as far as to say that it fixes the original problem of #2678, because
in this particular scenario we have empty queues in ~100% of the cases and
this new logic will likely lead to 100% fan out in this case (cancelation just
won't happen fast enough). But when the load grows and there is some waiting
in the queue it will optimize out the slowest links.
2022-10-10 19:48:06 +00:00
p . AddGetAddrSent ( )
2021-08-06 12:25:41 +00:00
}
2022-10-11 14:44:13 +00:00
replies <- send ( p , ctx , pkt )
network: rework broadcast logic
We have a number of queues for different purposes:
* regular broadcast queue
* direct p2p queue
* high-priority queue
And two basic egress scenarios:
* direct p2p messages (replies to requests in Server's handle* methods)
* broadcasted messages
Low priority broadcasted messages:
* transaction inventories
* block inventories
* notary inventories
* non-consensus extensibles
High-priority broadcasted messages:
* consensus extensibles
* getdata transaction requests from consensus process
* getaddr requests
P2P messages are a bit more complicated, most of the time they use p2p queue,
but extensible message requests/replies use HP queue.
Server's handle* code is run from Peer's handleIncoming, every peer has this
thread that handles incoming messages. When working with the peer it's
important to reply to requests and blocking this thread until we send (queue)
a reply is fine, if the peer is slow we just won't get anything new from
it. The queue used is irrelevant wrt this issue.
Broadcasted messages are radically different, we want them to be delivered to
many peers, but we don't care about specific ones. If it's delivered to 2/3 of
the peers we're fine, if it's delivered to more of them --- it's not an
issue. But doing this fairly is not an easy thing, current code tries performing
unblocked sends and if this doesn't yield enough results it then blocks (but
has a timeout, we can't wait indefinitely). But it does so in sequential
manner, once the peer is chosen the code will wait for it (and only it) until
timeout happens.
What can be done instead is an attempt to push the message to all of the peers
simultaneously (or close to that). If they all deliver --- OK, if some block
and wait then we can wait until _any_ of them pushes the message through (or
global timeout happens, we still can't wait forever). If we have enough
deliveries then we can cancel pending ones and it's again not an error if
these canceled threads still do their job.
This makes the system more dynamic and adds some substantial processing
overhead, but it's a networking code, any of this overhead is much lower than
the actual packet delivery time. It also allows to spread the load more
fairly, if there is any spare queue it'll get the packet and release the
broadcaster. On the next broadcast iteration another peer is more likely to be
chosen just because it didn't get a message previously (and had some time to
deliver already queued messages).
It works perfectly in tests, with optimal networking conditions we have much
better block times and TPS increases by 5-25%% depending on the scenario.
I'd go as far as to say that it fixes the original problem of #2678, because
in this particular scenario we have empty queues in ~100% of the cases and
this new logic will likely lead to 100% fan out in this case (cancelation just
won't happen fast enough). But when the load grows and there is some waiting
in the queue it will optimize out the slowest links.
2022-10-10 19:48:06 +00:00
} ( peer , ctx , pkt )
}
for r := range replies {
if r == nil {
sentN ++
} else {
deadN ++
}
if sentN + deadN == peerN {
break
}
// Send to 2/3 of good peers.
if 3 * sentN >= 2 * ( peerN - deadN ) && ctx . Err ( ) == nil {
cancel ( )
2020-12-22 12:55:55 +00:00
}
2019-11-15 10:32:40 +00:00
}
network: rework broadcast logic
We have a number of queues for different purposes:
* regular broadcast queue
* direct p2p queue
* high-priority queue
And two basic egress scenarios:
* direct p2p messages (replies to requests in Server's handle* methods)
* broadcasted messages
Low priority broadcasted messages:
* transaction inventories
* block inventories
* notary inventories
* non-consensus extensibles
High-priority broadcasted messages:
* consensus extensibles
* getdata transaction requests from consensus process
* getaddr requests
P2P messages are a bit more complicated, most of the time they use p2p queue,
but extensible message requests/replies use HP queue.
Server's handle* code is run from Peer's handleIncoming, every peer has this
thread that handles incoming messages. When working with the peer it's
important to reply to requests and blocking this thread until we send (queue)
a reply is fine, if the peer is slow we just won't get anything new from
it. The queue used is irrelevant wrt this issue.
Broadcasted messages are radically different, we want them to be delivered to
many peers, but we don't care about specific ones. If it's delivered to 2/3 of
the peers we're fine, if it's delivered to more of them --- it's not an
issue. But doing this fairly is not an easy thing, current code tries performing
unblocked sends and if this doesn't yield enough results it then blocks (but
has a timeout, we can't wait indefinitely). But it does so in sequential
manner, once the peer is chosen the code will wait for it (and only it) until
timeout happens.
What can be done instead is an attempt to push the message to all of the peers
simultaneously (or close to that). If they all deliver --- OK, if some block
and wait then we can wait until _any_ of them pushes the message through (or
global timeout happens, we still can't wait forever). If we have enough
deliveries then we can cancel pending ones and it's again not an error if
these canceled threads still do their job.
This makes the system more dynamic and adds some substantial processing
overhead, but it's a networking code, any of this overhead is much lower than
the actual packet delivery time. It also allows to spread the load more
fairly, if there is any spare queue it'll get the packet and release the
broadcaster. On the next broadcast iteration another peer is more likely to be
chosen just because it didn't get a message previously (and had some time to
deliver already queued messages).
It works perfectly in tests, with optimal networking conditions we have much
better block times and TPS increases by 5-25%% depending on the scenario.
I'd go as far as to say that it fixes the original problem of #2678, because
in this particular scenario we have empty queues in ~100% of the cases and
this new logic will likely lead to 100% fan out in this case (cancelation just
won't happen fast enough). But when the load grows and there is some waiting
in the queue it will optimize out the slowest links.
2022-10-10 19:48:06 +00:00
cancel ( )
close ( replies )
2019-11-15 10:32:40 +00:00
}
2020-01-22 08:01:13 +00:00
// broadcastMessage sends the message to all available peers.
func ( s * Server ) broadcastMessage ( msg * Message ) {
network: rework broadcast logic
We have a number of queues for different purposes:
* regular broadcast queue
* direct p2p queue
* high-priority queue
And two basic egress scenarios:
* direct p2p messages (replies to requests in Server's handle* methods)
* broadcasted messages
Low priority broadcasted messages:
* transaction inventories
* block inventories
* notary inventories
* non-consensus extensibles
High-priority broadcasted messages:
* consensus extensibles
* getdata transaction requests from consensus process
* getaddr requests
P2P messages are a bit more complicated, most of the time they use p2p queue,
but extensible message requests/replies use HP queue.
Server's handle* code is run from Peer's handleIncoming, every peer has this
thread that handles incoming messages. When working with the peer it's
important to reply to requests and blocking this thread until we send (queue)
a reply is fine, if the peer is slow we just won't get anything new from
it. The queue used is irrelevant wrt this issue.
Broadcasted messages are radically different, we want them to be delivered to
many peers, but we don't care about specific ones. If it's delivered to 2/3 of
the peers we're fine, if it's delivered to more of them --- it's not an
issue. But doing this fairly is not an easy thing, current code tries performing
unblocked sends and if this doesn't yield enough results it then blocks (but
has a timeout, we can't wait indefinitely). But it does so in sequential
manner, once the peer is chosen the code will wait for it (and only it) until
timeout happens.
What can be done instead is an attempt to push the message to all of the peers
simultaneously (or close to that). If they all deliver --- OK, if some block
and wait then we can wait until _any_ of them pushes the message through (or
global timeout happens, we still can't wait forever). If we have enough
deliveries then we can cancel pending ones and it's again not an error if
these canceled threads still do their job.
This makes the system more dynamic and adds some substantial processing
overhead, but it's a networking code, any of this overhead is much lower than
the actual packet delivery time. It also allows to spread the load more
fairly, if there is any spare queue it'll get the packet and release the
broadcaster. On the next broadcast iteration another peer is more likely to be
chosen just because it didn't get a message previously (and had some time to
deliver already queued messages).
It works perfectly in tests, with optimal networking conditions we have much
better block times and TPS increases by 5-25%% depending on the scenario.
I'd go as far as to say that it fixes the original problem of #2678, because
in this particular scenario we have empty queues in ~100% of the cases and
this new logic will likely lead to 100% fan out in this case (cancelation just
won't happen fast enough). But when the load grows and there is some waiting
in the queue it will optimize out the slowest links.
2022-10-10 19:48:06 +00:00
s . iteratePeersWithSendMsg ( msg , Peer . BroadcastPacket , nil )
2020-01-22 08:01:13 +00:00
}
// broadcastHPMessage sends the high-priority message to all available peers.
func ( s * Server ) broadcastHPMessage ( msg * Message ) {
network: rework broadcast logic
We have a number of queues for different purposes:
* regular broadcast queue
* direct p2p queue
* high-priority queue
And two basic egress scenarios:
* direct p2p messages (replies to requests in Server's handle* methods)
* broadcasted messages
Low priority broadcasted messages:
* transaction inventories
* block inventories
* notary inventories
* non-consensus extensibles
High-priority broadcasted messages:
* consensus extensibles
* getdata transaction requests from consensus process
* getaddr requests
P2P messages are a bit more complicated, most of the time they use p2p queue,
but extensible message requests/replies use HP queue.
Server's handle* code is run from Peer's handleIncoming, every peer has this
thread that handles incoming messages. When working with the peer it's
important to reply to requests and blocking this thread until we send (queue)
a reply is fine, if the peer is slow we just won't get anything new from
it. The queue used is irrelevant wrt this issue.
Broadcasted messages are radically different, we want them to be delivered to
many peers, but we don't care about specific ones. If it's delivered to 2/3 of
the peers we're fine, if it's delivered to more of them --- it's not an
issue. But doing this fairly is not an easy thing, current code tries performing
unblocked sends and if this doesn't yield enough results it then blocks (but
has a timeout, we can't wait indefinitely). But it does so in sequential
manner, once the peer is chosen the code will wait for it (and only it) until
timeout happens.
What can be done instead is an attempt to push the message to all of the peers
simultaneously (or close to that). If they all deliver --- OK, if some block
and wait then we can wait until _any_ of them pushes the message through (or
global timeout happens, we still can't wait forever). If we have enough
deliveries then we can cancel pending ones and it's again not an error if
these canceled threads still do their job.
This makes the system more dynamic and adds some substantial processing
overhead, but it's a networking code, any of this overhead is much lower than
the actual packet delivery time. It also allows to spread the load more
fairly, if there is any spare queue it'll get the packet and release the
broadcaster. On the next broadcast iteration another peer is more likely to be
chosen just because it didn't get a message previously (and had some time to
deliver already queued messages).
It works perfectly in tests, with optimal networking conditions we have much
better block times and TPS increases by 5-25%% depending on the scenario.
I'd go as far as to say that it fixes the original problem of #2678, because
in this particular scenario we have empty queues in ~100% of the cases and
this new logic will likely lead to 100% fan out in this case (cancelation just
won't happen fast enough). But when the load grows and there is some waiting
in the queue it will optimize out the slowest links.
2022-10-10 19:48:06 +00:00
s . iteratePeersWithSendMsg ( msg , Peer . BroadcastHPPacket , nil )
2020-01-22 08:01:13 +00:00
}
2020-05-07 20:00:38 +00:00
// relayBlocksLoop subscribes to new blocks in the ledger and broadcasts them
// to the network. Intended to be run as a separate goroutine.
func ( s * Server ) relayBlocksLoop ( ) {
ch := make ( chan * block . Block , 2 ) // Some buffering to smooth out possible egressing delays.
s . chain . SubscribeForBlocks ( ch )
2022-06-28 15:07:22 +00:00
mainloop :
2020-05-07 20:00:38 +00:00
for {
select {
case <- s . quit :
s . chain . UnsubscribeFromBlocks ( ch )
2022-06-28 15:07:22 +00:00
break mainloop
2020-05-07 20:00:38 +00:00
case b := <- ch :
msg := NewMessage ( CMDInv , payload . NewInventory ( payload . BlockType , [ ] util . Uint256 { b . Hash ( ) } ) )
// Filter out nodes that are more current (avoid spamming the network
// during initial sync).
network: rework broadcast logic
We have a number of queues for different purposes:
* regular broadcast queue
* direct p2p queue
* high-priority queue
And two basic egress scenarios:
* direct p2p messages (replies to requests in Server's handle* methods)
* broadcasted messages
Low priority broadcasted messages:
* transaction inventories
* block inventories
* notary inventories
* non-consensus extensibles
High-priority broadcasted messages:
* consensus extensibles
* getdata transaction requests from consensus process
* getaddr requests
P2P messages are a bit more complicated, most of the time they use p2p queue,
but extensible message requests/replies use HP queue.
Server's handle* code is run from Peer's handleIncoming, every peer has this
thread that handles incoming messages. When working with the peer it's
important to reply to requests and blocking this thread until we send (queue)
a reply is fine, if the peer is slow we just won't get anything new from
it. The queue used is irrelevant wrt this issue.
Broadcasted messages are radically different, we want them to be delivered to
many peers, but we don't care about specific ones. If it's delivered to 2/3 of
the peers we're fine, if it's delivered to more of them --- it's not an
issue. But doing this fairly is not an easy thing, current code tries performing
unblocked sends and if this doesn't yield enough results it then blocks (but
has a timeout, we can't wait indefinitely). But it does so in sequential
manner, once the peer is chosen the code will wait for it (and only it) until
timeout happens.
What can be done instead is an attempt to push the message to all of the peers
simultaneously (or close to that). If they all deliver --- OK, if some block
and wait then we can wait until _any_ of them pushes the message through (or
global timeout happens, we still can't wait forever). If we have enough
deliveries then we can cancel pending ones and it's again not an error if
these canceled threads still do their job.
This makes the system more dynamic and adds some substantial processing
overhead, but it's a networking code, any of this overhead is much lower than
the actual packet delivery time. It also allows to spread the load more
fairly, if there is any spare queue it'll get the packet and release the
broadcaster. On the next broadcast iteration another peer is more likely to be
chosen just because it didn't get a message previously (and had some time to
deliver already queued messages).
It works perfectly in tests, with optimal networking conditions we have much
better block times and TPS increases by 5-25%% depending on the scenario.
I'd go as far as to say that it fixes the original problem of #2678, because
in this particular scenario we have empty queues in ~100% of the cases and
this new logic will likely lead to 100% fan out in this case (cancelation just
won't happen fast enough). But when the load grows and there is some waiting
in the queue it will optimize out the slowest links.
2022-10-10 19:48:06 +00:00
s . iteratePeersWithSendMsg ( msg , Peer . BroadcastPacket , func ( p Peer ) bool {
2020-05-07 20:00:38 +00:00
return p . Handshaked ( ) && p . LastBlockIndex ( ) < b . Index
} )
2021-01-18 12:52:51 +00:00
s . extensiblePool . RemoveStale ( b . Index )
2020-05-07 20:00:38 +00:00
}
}
2022-06-28 15:07:22 +00:00
drainBlocksLoop :
for {
select {
case <- ch :
default :
break drainBlocksLoop
}
}
close ( ch )
2022-08-19 17:43:15 +00:00
close ( s . relayFin )
2019-11-29 09:27:15 +00:00
}
2020-01-29 08:56:40 +00:00
// verifyAndPoolTX verifies the TX and adds it to the local mempool.
2021-02-17 11:51:54 +00:00
func ( s * Server ) verifyAndPoolTX ( t * transaction . Transaction ) error {
return s . chain . PoolTx ( t )
2020-01-29 08:56:40 +00:00
}
// RelayTxn a new transaction to the local node and the connected peers.
// Reference: the method OnRelay in C#: https://github.com/neo-project/neo/blob/master/neo/Network/P2P/LocalNode.cs#L159
2021-02-17 11:51:54 +00:00
func ( s * Server ) RelayTxn ( t * transaction . Transaction ) error {
err := s . verifyAndPoolTX ( t )
if err == nil {
2020-11-27 10:55:48 +00:00
s . broadcastTX ( t , nil )
2020-01-29 08:56:40 +00:00
}
2021-02-17 11:51:54 +00:00
return err
2020-01-29 08:56:40 +00:00
}
Implement rpc server method: sendrawtransaction (#174)
* Added new config attributes: 'SecondsPerBlock','LowPriorityThreshold'
* Added new files:
* Added new method: CompareTo
* Fixed empty Slice case
* Added new methods: LessThan, GreaterThan, Equal, CompareTo
* Added new method: InputIntersection
* Added MaxTransactionSize, GroupOutputByAssetID
* Added ned method: ScriptHash
* Added new method: IsDoubleSpend
* Refactor blockchainer, Added Feer interface, Verify and GetMemPool method
* 1) Added MemPool
2) Added new methods to satisfy the blockchainer interface: IsLowPriority, Verify, GetMemPool
* Added new methods: RelayTxn, RelayDirectly
* Fixed tests
* Implemented RPC server method sendrawtransaction
* Refactor getrawtransaction, sendrawtransaction in separate methods
* Moved 'secondsPerBlock' to config file
* Implemented Kim suggestions:
1) Fixed data race issues
2) refactor Verify method
3) Get rid of unused InputIntersection method due to refactoring Verify method
4) Fixed bug in https://github.com/CityOfZion/neo-go/pull/174#discussion_r264108135
5) minor simplications of the code
* Fixed minor issues related to
1) space
2) getter methods do not need pointer on the receiver
3) error message
4) refactoring CompareTo method in uint256.go
* Fixed small issues
* Use sync.RWMutex instead of sync.Mutex
* Refined (R)Lock/(R)Unlock
* return error instead of bool in Verify methods
2019-03-20 12:30:05 +00:00
2020-01-29 08:56:40 +00:00
// broadcastTX broadcasts an inventory message about new transaction.
2020-11-27 10:55:48 +00:00
func ( s * Server ) broadcastTX ( t * transaction . Transaction , _ interface { } ) {
2020-02-18 15:11:55 +00:00
select {
case s . transactions <- t :
case <- s . quit :
}
}
func ( s * Server ) broadcastTxHashes ( hs [ ] util . Uint256 ) {
2020-05-21 10:35:44 +00:00
msg := NewMessage ( CMDInv , payload . NewInventory ( payload . TXType , hs ) )
2020-01-22 08:01:13 +00:00
// We need to filter out non-relaying nodes, so plain broadcast
// functions don't fit here.
network: rework broadcast logic
We have a number of queues for different purposes:
* regular broadcast queue
* direct p2p queue
* high-priority queue
And two basic egress scenarios:
* direct p2p messages (replies to requests in Server's handle* methods)
* broadcasted messages
Low priority broadcasted messages:
* transaction inventories
* block inventories
* notary inventories
* non-consensus extensibles
High-priority broadcasted messages:
* consensus extensibles
* getdata transaction requests from consensus process
* getaddr requests
P2P messages are a bit more complicated, most of the time they use p2p queue,
but extensible message requests/replies use HP queue.
Server's handle* code is run from Peer's handleIncoming, every peer has this
thread that handles incoming messages. When working with the peer it's
important to reply to requests and blocking this thread until we send (queue)
a reply is fine, if the peer is slow we just won't get anything new from
it. The queue used is irrelevant wrt this issue.
Broadcasted messages are radically different, we want them to be delivered to
many peers, but we don't care about specific ones. If it's delivered to 2/3 of
the peers we're fine, if it's delivered to more of them --- it's not an
issue. But doing this fairly is not an easy thing, current code tries performing
unblocked sends and if this doesn't yield enough results it then blocks (but
has a timeout, we can't wait indefinitely). But it does so in sequential
manner, once the peer is chosen the code will wait for it (and only it) until
timeout happens.
What can be done instead is an attempt to push the message to all of the peers
simultaneously (or close to that). If they all deliver --- OK, if some block
and wait then we can wait until _any_ of them pushes the message through (or
global timeout happens, we still can't wait forever). If we have enough
deliveries then we can cancel pending ones and it's again not an error if
these canceled threads still do their job.
This makes the system more dynamic and adds some substantial processing
overhead, but it's a networking code, any of this overhead is much lower than
the actual packet delivery time. It also allows to spread the load more
fairly, if there is any spare queue it'll get the packet and release the
broadcaster. On the next broadcast iteration another peer is more likely to be
chosen just because it didn't get a message previously (and had some time to
deliver already queued messages).
It works perfectly in tests, with optimal networking conditions we have much
better block times and TPS increases by 5-25%% depending on the scenario.
I'd go as far as to say that it fixes the original problem of #2678, because
in this particular scenario we have empty queues in ~100% of the cases and
this new logic will likely lead to 100% fan out in this case (cancelation just
won't happen fast enough). But when the load grows and there is some waiting
in the queue it will optimize out the slowest links.
2022-10-10 19:48:06 +00:00
s . iteratePeersWithSendMsg ( msg , Peer . BroadcastPacket , Peer . IsFullNode )
Implement rpc server method: sendrawtransaction (#174)
* Added new config attributes: 'SecondsPerBlock','LowPriorityThreshold'
* Added new files:
* Added new method: CompareTo
* Fixed empty Slice case
* Added new methods: LessThan, GreaterThan, Equal, CompareTo
* Added new method: InputIntersection
* Added MaxTransactionSize, GroupOutputByAssetID
* Added ned method: ScriptHash
* Added new method: IsDoubleSpend
* Refactor blockchainer, Added Feer interface, Verify and GetMemPool method
* 1) Added MemPool
2) Added new methods to satisfy the blockchainer interface: IsLowPriority, Verify, GetMemPool
* Added new methods: RelayTxn, RelayDirectly
* Fixed tests
* Implemented RPC server method sendrawtransaction
* Refactor getrawtransaction, sendrawtransaction in separate methods
* Moved 'secondsPerBlock' to config file
* Implemented Kim suggestions:
1) Fixed data race issues
2) refactor Verify method
3) Get rid of unused InputIntersection method due to refactoring Verify method
4) Fixed bug in https://github.com/CityOfZion/neo-go/pull/174#discussion_r264108135
5) minor simplications of the code
* Fixed minor issues related to
1) space
2) getter methods do not need pointer on the receiver
3) error message
4) refactoring CompareTo method in uint256.go
* Fixed small issues
* Use sync.RWMutex instead of sync.Mutex
* Refined (R)Lock/(R)Unlock
* return error instead of bool in Verify methods
2019-03-20 12:30:05 +00:00
}
2020-02-18 15:11:55 +00:00
2020-11-27 10:55:48 +00:00
// initStaleMemPools initializes mempools for stale tx/payload processing.
func ( s * Server ) initStaleMemPools ( ) {
2020-11-11 12:49:51 +00:00
threshold := 5
2022-01-21 02:33:06 +00:00
// Not perfect, can change over time, but should be sufficient.
numOfCNs := s . config . GetNumOfCNs ( s . chain . BlockHeight ( ) )
if numOfCNs * 2 > threshold {
threshold = numOfCNs * 2
2020-11-11 12:49:51 +00:00
}
2021-08-03 19:28:16 +00:00
s . mempool . SetResendThreshold ( uint32 ( threshold ) , s . broadcastTX )
2020-11-27 10:55:48 +00:00
if s . chain . P2PSigExtensionsEnabled ( ) {
s . notaryRequestPool . SetResendThreshold ( uint32 ( threshold ) , s . broadcastP2PNotaryRequestPayload )
}
2020-11-11 12:49:51 +00:00
}
2020-02-18 15:11:55 +00:00
// broadcastTxLoop is a loop for batching and sending
// transactions hashes in an INV payload.
func ( s * Server ) broadcastTxLoop ( ) {
const (
batchTime = time . Millisecond * 50
batchSize = 32
)
txs := make ( [ ] util . Uint256 , 0 , batchSize )
var timer * time . Timer
timerCh := func ( ) <- chan time . Time {
if timer == nil {
return nil
}
return timer . C
}
broadcast := func ( ) {
s . broadcastTxHashes ( txs )
txs = txs [ : 0 ]
if timer != nil {
timer . Stop ( )
}
}
for {
select {
case <- s . quit :
loop :
for {
select {
case <- s . transactions :
default :
break loop
}
}
return
case <- timerCh ( ) :
if len ( txs ) > 0 {
broadcast ( )
}
case tx := <- s . transactions :
if len ( txs ) == 0 {
timer = time . NewTimer ( batchTime )
}
txs = append ( txs , tx . Hash ( ) )
if len ( txs ) == batchSize {
broadcast ( )
}
}
}
}
2020-06-10 07:01:21 +00:00
2021-04-30 08:24:43 +00:00
// Port returns a server port that should be used in P2P version exchange. In
2022-04-20 18:30:09 +00:00
// case `AnnouncedPort` is set in the server.Config, the announced node port
2021-04-30 08:24:43 +00:00
// will be returned (e.g. consider the node running behind NAT). If `AnnouncedPort`
// isn't set, the port returned may still differs from that of server.Config.
2020-06-10 07:01:21 +00:00
func ( s * Server ) Port ( ) ( uint16 , error ) {
2021-04-30 08:24:43 +00:00
if s . AnnouncedPort != 0 {
return s . ServerConfig . AnnouncedPort , nil
}
2020-06-10 07:01:21 +00:00
var port uint16
_ , portStr , err := net . SplitHostPort ( s . transport . Address ( ) )
if err != nil {
port = s . ServerConfig . Port
} else {
p , err := strconv . ParseUint ( portStr , 10 , 16 )
if err != nil {
return 0 , err
}
port = uint16 ( p )
}
return port , nil
}