Merge pull request #2743 from nspcc-dev/log-fan-out
Logarithmic gossip fan out
This commit is contained in:
commit
dce9f80585
10 changed files with 169 additions and 128 deletions
|
@ -69,7 +69,7 @@ ApplicationConfiguration:
|
||||||
PingTimeout: 90
|
PingTimeout: 90
|
||||||
MaxPeers: 100
|
MaxPeers: 100
|
||||||
AttemptConnPeers: 20
|
AttemptConnPeers: 20
|
||||||
MinPeers: 5
|
MinPeers: 10
|
||||||
Oracle:
|
Oracle:
|
||||||
Enabled: false
|
Enabled: false
|
||||||
AllowedContentTypes:
|
AllowedContentTypes:
|
||||||
|
|
|
@ -72,7 +72,7 @@ ApplicationConfiguration:
|
||||||
PingTimeout: 90
|
PingTimeout: 90
|
||||||
MaxPeers: 100
|
MaxPeers: 100
|
||||||
AttemptConnPeers: 20
|
AttemptConnPeers: 20
|
||||||
MinPeers: 5
|
MinPeers: 10
|
||||||
Oracle:
|
Oracle:
|
||||||
Enabled: false
|
Enabled: false
|
||||||
AllowedContentTypes:
|
AllowedContentTypes:
|
||||||
|
|
|
@ -19,6 +19,7 @@ node-related settings described in the table below.
|
||||||
| Address | `string` | `0.0.0.0` | Node address that P2P protocol handler binds to. |
|
| Address | `string` | `0.0.0.0` | Node address that P2P protocol handler binds to. |
|
||||||
| AnnouncedPort | `uint16` | Same as `NodePort` | Node port which should be used to announce node's port on P2P layer, it can differ from the `NodePort` the node is bound to (for example, if your node is behind NAT). |
|
| AnnouncedPort | `uint16` | Same as `NodePort` | Node port which should be used to announce node's port on P2P layer, it can differ from the `NodePort` the node is bound to (for example, if your node is behind NAT). |
|
||||||
| AttemptConnPeers | `int` | `20` | Number of connection to try to establish when the connection count drops below the `MinPeers` value.|
|
| AttemptConnPeers | `int` | `20` | Number of connection to try to establish when the connection count drops below the `MinPeers` value.|
|
||||||
|
| BroadcastFactor | `int` | `0` | Multiplier that is used to determine the number of optimal gossip fan-out peer number for broadcasted messages (0-100). By default it's zero, node uses the most optimized value depending on the estimated network size (`2.5×log(size)`), so the node may have 20 peers and calculate that it needs to broadcast messages to just 10 of them. With BroadcastFactor set to 100 it will always send messages to all peers, any value in-between 0 and 100 is used for weighted calculation, for example if it's 30 then 13 neighbors will be used in the previous case. |
|
||||||
| DBConfiguration | [DB Configuration](#DB-Configuration) | | Describes configuration for database. See the [DB Configuration](#DB-Configuration) section for details. |
|
| DBConfiguration | [DB Configuration](#DB-Configuration) | | Describes configuration for database. See the [DB Configuration](#DB-Configuration) section for details. |
|
||||||
| DialTimeout | `int64` | `0` | Maximum duration a single dial may take in seconds. |
|
| DialTimeout | `int64` | `0` | Maximum duration a single dial may take in seconds. |
|
||||||
| ExtensiblePoolSize | `int` | `20` | Maximum amount of the extensible payloads from a single sender stored in a local pool. |
|
| ExtensiblePoolSize | `int` | `20` | Maximum amount of the extensible payloads from a single sender stored in a local pool. |
|
||||||
|
|
|
@ -9,6 +9,8 @@ type ApplicationConfiguration struct {
|
||||||
Address string `yaml:"Address"`
|
Address string `yaml:"Address"`
|
||||||
AnnouncedNodePort uint16 `yaml:"AnnouncedPort"`
|
AnnouncedNodePort uint16 `yaml:"AnnouncedPort"`
|
||||||
AttemptConnPeers int `yaml:"AttemptConnPeers"`
|
AttemptConnPeers int `yaml:"AttemptConnPeers"`
|
||||||
|
// BroadcastFactor is the factor (0-100) controlling gossip fan-out number optimization.
|
||||||
|
BroadcastFactor int `yaml:"BroadcastFactor"`
|
||||||
DBConfiguration dbconfig.DBConfiguration `yaml:"DBConfiguration"`
|
DBConfiguration dbconfig.DBConfiguration `yaml:"DBConfiguration"`
|
||||||
DialTimeout int64 `yaml:"DialTimeout"`
|
DialTimeout int64 `yaml:"DialTimeout"`
|
||||||
LogPath string `yaml:"LogPath"`
|
LogPath string `yaml:"LogPath"`
|
||||||
|
@ -36,6 +38,7 @@ func (a *ApplicationConfiguration) EqualsButServices(o *ApplicationConfiguration
|
||||||
if a.Address != o.Address ||
|
if a.Address != o.Address ||
|
||||||
a.AnnouncedNodePort != o.AnnouncedNodePort ||
|
a.AnnouncedNodePort != o.AnnouncedNodePort ||
|
||||||
a.AttemptConnPeers != o.AttemptConnPeers ||
|
a.AttemptConnPeers != o.AttemptConnPeers ||
|
||||||
|
a.BroadcastFactor != o.BroadcastFactor ||
|
||||||
a.DBConfiguration != o.DBConfiguration ||
|
a.DBConfiguration != o.DBConfiguration ||
|
||||||
a.DialTimeout != o.DialTimeout ||
|
a.DialTimeout != o.DialTimeout ||
|
||||||
a.ExtensiblePoolSize != o.ExtensiblePoolSize ||
|
a.ExtensiblePoolSize != o.ExtensiblePoolSize ||
|
||||||
|
|
|
@ -1,14 +1,16 @@
|
||||||
package network
|
package network
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/network/capability"
|
"github.com/nspcc-dev/neo-go/pkg/network/capability"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
maxPoolSize = 200
|
maxPoolSize = 10000
|
||||||
connRetries = 3
|
connRetries = 3
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -16,7 +18,8 @@ const (
|
||||||
// a healthy connection pool.
|
// a healthy connection pool.
|
||||||
type Discoverer interface {
|
type Discoverer interface {
|
||||||
BackFill(...string)
|
BackFill(...string)
|
||||||
Close()
|
GetFanOut() int
|
||||||
|
NetworkSize() int
|
||||||
PoolCount() int
|
PoolCount() int
|
||||||
RequestRemote(int)
|
RequestRemote(int)
|
||||||
RegisterBadAddr(string)
|
RegisterBadAddr(string)
|
||||||
|
@ -39,17 +42,15 @@ type DefaultDiscovery struct {
|
||||||
seeds []string
|
seeds []string
|
||||||
transport Transporter
|
transport Transporter
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
closeMtx sync.RWMutex
|
|
||||||
dialTimeout time.Duration
|
dialTimeout time.Duration
|
||||||
badAddrs map[string]bool
|
badAddrs map[string]bool
|
||||||
connectedAddrs map[string]bool
|
connectedAddrs map[string]bool
|
||||||
goodAddrs map[string]capability.Capabilities
|
goodAddrs map[string]capability.Capabilities
|
||||||
unconnectedAddrs map[string]int
|
unconnectedAddrs map[string]int
|
||||||
attempted map[string]bool
|
attempted map[string]bool
|
||||||
isDead bool
|
optimalFanOut int32
|
||||||
|
networkSize int32
|
||||||
requestCh chan int
|
requestCh chan int
|
||||||
pool chan string
|
|
||||||
runExit chan struct{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDefaultDiscovery returns a new DefaultDiscovery.
|
// NewDefaultDiscovery returns a new DefaultDiscovery.
|
||||||
|
@ -64,10 +65,7 @@ func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *Defa
|
||||||
unconnectedAddrs: make(map[string]int),
|
unconnectedAddrs: make(map[string]int),
|
||||||
attempted: make(map[string]bool),
|
attempted: make(map[string]bool),
|
||||||
requestCh: make(chan int),
|
requestCh: make(chan int),
|
||||||
pool: make(chan string, maxPoolSize),
|
|
||||||
runExit: make(chan struct{}),
|
|
||||||
}
|
}
|
||||||
go d.run()
|
|
||||||
return d
|
return d
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,54 +77,92 @@ func newDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) Disco
|
||||||
// the pool with the given addresses.
|
// the pool with the given addresses.
|
||||||
func (d *DefaultDiscovery) BackFill(addrs ...string) {
|
func (d *DefaultDiscovery) BackFill(addrs ...string) {
|
||||||
d.lock.Lock()
|
d.lock.Lock()
|
||||||
|
d.backfill(addrs...)
|
||||||
|
d.lock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DefaultDiscovery) backfill(addrs ...string) {
|
||||||
for _, addr := range addrs {
|
for _, addr := range addrs {
|
||||||
if d.badAddrs[addr] || d.connectedAddrs[addr] ||
|
if d.badAddrs[addr] || d.connectedAddrs[addr] ||
|
||||||
d.unconnectedAddrs[addr] > 0 {
|
d.unconnectedAddrs[addr] > 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
d.unconnectedAddrs[addr] = connRetries
|
|
||||||
d.pushToPoolOrDrop(addr)
|
d.pushToPoolOrDrop(addr)
|
||||||
}
|
}
|
||||||
d.lock.Unlock()
|
d.updateNetSize()
|
||||||
}
|
}
|
||||||
|
|
||||||
// PoolCount returns the number of the available node addresses.
|
// PoolCount returns the number of the available node addresses.
|
||||||
func (d *DefaultDiscovery) PoolCount() int {
|
func (d *DefaultDiscovery) PoolCount() int {
|
||||||
return len(d.pool)
|
d.lock.RLock()
|
||||||
|
defer d.lock.RUnlock()
|
||||||
|
return d.poolCount()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DefaultDiscovery) poolCount() int {
|
||||||
|
return len(d.unconnectedAddrs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// pushToPoolOrDrop tries to push the address given into the pool, but if the pool
|
// pushToPoolOrDrop tries to push the address given into the pool, but if the pool
|
||||||
// is already full, it just drops it.
|
// is already full, it just drops it.
|
||||||
func (d *DefaultDiscovery) pushToPoolOrDrop(addr string) {
|
func (d *DefaultDiscovery) pushToPoolOrDrop(addr string) {
|
||||||
select {
|
if len(d.unconnectedAddrs) < maxPoolSize {
|
||||||
case d.pool <- addr:
|
d.unconnectedAddrs[addr] = connRetries
|
||||||
updatePoolCountMetric(d.PoolCount())
|
|
||||||
// ok, queued
|
|
||||||
default:
|
|
||||||
// whatever
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RequestRemote tries to establish a connection with n nodes.
|
// RequestRemote tries to establish a connection with n nodes.
|
||||||
func (d *DefaultDiscovery) RequestRemote(n int) {
|
func (d *DefaultDiscovery) RequestRemote(requested int) {
|
||||||
d.closeMtx.RLock()
|
for ; requested > 0; requested-- {
|
||||||
if !d.isDead {
|
var nextAddr string
|
||||||
d.requestCh <- n
|
d.lock.Lock()
|
||||||
|
for addr := range d.unconnectedAddrs {
|
||||||
|
if !d.connectedAddrs[addr] && !d.attempted[addr] {
|
||||||
|
nextAddr = addr
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if nextAddr == "" {
|
||||||
|
// Empty pool, try seeds.
|
||||||
|
for _, addr := range d.seeds {
|
||||||
|
if !d.connectedAddrs[addr] && !d.attempted[addr] {
|
||||||
|
nextAddr = addr
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if nextAddr == "" {
|
||||||
|
d.lock.Unlock()
|
||||||
|
// The pool is empty, but all seed nodes are already connected (or attempted),
|
||||||
|
// we can end up in an infinite loop here, so drop the request.
|
||||||
|
break
|
||||||
|
}
|
||||||
|
d.attempted[nextAddr] = true
|
||||||
|
d.lock.Unlock()
|
||||||
|
go d.tryAddress(nextAddr)
|
||||||
}
|
}
|
||||||
d.closeMtx.RUnlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterBadAddr registers the given address as a bad address.
|
// RegisterBadAddr registers the given address as a bad address.
|
||||||
func (d *DefaultDiscovery) RegisterBadAddr(addr string) {
|
func (d *DefaultDiscovery) RegisterBadAddr(addr string) {
|
||||||
|
var isSeed bool
|
||||||
d.lock.Lock()
|
d.lock.Lock()
|
||||||
|
for _, seed := range d.seeds {
|
||||||
|
if addr == seed {
|
||||||
|
isSeed = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !isSeed {
|
||||||
d.unconnectedAddrs[addr]--
|
d.unconnectedAddrs[addr]--
|
||||||
if d.unconnectedAddrs[addr] > 0 {
|
if d.unconnectedAddrs[addr] <= 0 {
|
||||||
d.pushToPoolOrDrop(addr)
|
|
||||||
} else {
|
|
||||||
d.badAddrs[addr] = true
|
d.badAddrs[addr] = true
|
||||||
delete(d.unconnectedAddrs, addr)
|
delete(d.unconnectedAddrs, addr)
|
||||||
delete(d.goodAddrs, addr)
|
delete(d.goodAddrs, addr)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
d.updateNetSize()
|
||||||
d.lock.Unlock()
|
d.lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -181,6 +217,7 @@ func (d *DefaultDiscovery) RegisterGoodAddr(s string, c capability.Capabilities)
|
||||||
func (d *DefaultDiscovery) UnregisterConnectedAddr(s string) {
|
func (d *DefaultDiscovery) UnregisterConnectedAddr(s string) {
|
||||||
d.lock.Lock()
|
d.lock.Lock()
|
||||||
delete(d.connectedAddrs, s)
|
delete(d.connectedAddrs, s)
|
||||||
|
d.backfill(s)
|
||||||
d.lock.Unlock()
|
d.lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,9 +226,34 @@ func (d *DefaultDiscovery) RegisterConnectedAddr(addr string) {
|
||||||
d.lock.Lock()
|
d.lock.Lock()
|
||||||
delete(d.unconnectedAddrs, addr)
|
delete(d.unconnectedAddrs, addr)
|
||||||
d.connectedAddrs[addr] = true
|
d.connectedAddrs[addr] = true
|
||||||
|
d.updateNetSize()
|
||||||
d.lock.Unlock()
|
d.lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetFanOut returns the optimal number of nodes to broadcast packets to.
|
||||||
|
func (d *DefaultDiscovery) GetFanOut() int {
|
||||||
|
return int(atomic.LoadInt32(&d.optimalFanOut))
|
||||||
|
}
|
||||||
|
|
||||||
|
// NetworkSize returns the estimated network size.
|
||||||
|
func (d *DefaultDiscovery) NetworkSize() int {
|
||||||
|
return int(atomic.LoadInt32(&d.networkSize))
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateNetSize updates network size estimation metric. Must be called under read lock.
|
||||||
|
func (d *DefaultDiscovery) updateNetSize() {
|
||||||
|
var netsize = len(d.connectedAddrs) + len(d.unconnectedAddrs) + 1 // 1 for the node itself.
|
||||||
|
var fanOut = 2.5 * math.Log(float64(netsize-1)) // -1 for the number of potential peers.
|
||||||
|
if netsize == 2 { // log(1) == 0.
|
||||||
|
fanOut = 1 // But we still want to push messages to the peer.
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic.StoreInt32(&d.optimalFanOut, int32(fanOut+0.5)) // Truncating conversion, hence +0.5.
|
||||||
|
atomic.StoreInt32(&d.networkSize, int32(netsize))
|
||||||
|
updateNetworkSizeMetric(netsize)
|
||||||
|
updatePoolCountMetric(d.poolCount())
|
||||||
|
}
|
||||||
|
|
||||||
func (d *DefaultDiscovery) tryAddress(addr string) {
|
func (d *DefaultDiscovery) tryAddress(addr string) {
|
||||||
err := d.transport.Dial(addr, d.dialTimeout)
|
err := d.transport.Dial(addr, d.dialTimeout)
|
||||||
d.lock.Lock()
|
d.lock.Lock()
|
||||||
|
@ -202,76 +264,3 @@ func (d *DefaultDiscovery) tryAddress(addr string) {
|
||||||
d.RequestRemote(1)
|
d.RequestRemote(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close stops discoverer pool processing, which makes the discoverer almost useless.
|
|
||||||
func (d *DefaultDiscovery) Close() {
|
|
||||||
d.closeMtx.Lock()
|
|
||||||
d.isDead = true
|
|
||||||
d.closeMtx.Unlock()
|
|
||||||
select {
|
|
||||||
case <-d.requestCh: // Drain the channel if there is anything there.
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
close(d.requestCh)
|
|
||||||
<-d.runExit
|
|
||||||
}
|
|
||||||
|
|
||||||
// run is a goroutine that makes DefaultDiscovery process its queue to connect
|
|
||||||
// to other nodes.
|
|
||||||
func (d *DefaultDiscovery) run() {
|
|
||||||
var requested, oldRequest, r int
|
|
||||||
var ok bool
|
|
||||||
|
|
||||||
for {
|
|
||||||
if requested == 0 {
|
|
||||||
requested, ok = <-d.requestCh
|
|
||||||
}
|
|
||||||
oldRequest = requested
|
|
||||||
for ok && requested > 0 {
|
|
||||||
select {
|
|
||||||
case r, ok = <-d.requestCh:
|
|
||||||
if requested <= r {
|
|
||||||
requested = r
|
|
||||||
}
|
|
||||||
case addr := <-d.pool:
|
|
||||||
updatePoolCountMetric(d.PoolCount())
|
|
||||||
d.lock.Lock()
|
|
||||||
if !d.connectedAddrs[addr] && !d.attempted[addr] {
|
|
||||||
d.attempted[addr] = true
|
|
||||||
go d.tryAddress(addr)
|
|
||||||
requested--
|
|
||||||
}
|
|
||||||
d.lock.Unlock()
|
|
||||||
default: // Empty pool
|
|
||||||
var added int
|
|
||||||
d.lock.Lock()
|
|
||||||
for _, addr := range d.seeds {
|
|
||||||
if !d.connectedAddrs[addr] {
|
|
||||||
delete(d.badAddrs, addr)
|
|
||||||
d.unconnectedAddrs[addr] = connRetries
|
|
||||||
d.pushToPoolOrDrop(addr)
|
|
||||||
added++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
d.lock.Unlock()
|
|
||||||
// The pool is empty, but all seed nodes are already connected,
|
|
||||||
// we can end up in an infinite loop here, so drop the request.
|
|
||||||
if added == 0 {
|
|
||||||
requested = 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !ok {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
// Special case, no connections after all attempts.
|
|
||||||
d.lock.RLock()
|
|
||||||
connected := len(d.connectedAddrs)
|
|
||||||
d.lock.RUnlock()
|
|
||||||
if connected == 0 {
|
|
||||||
time.Sleep(d.dialTimeout)
|
|
||||||
requested = oldRequest
|
|
||||||
}
|
|
||||||
}
|
|
||||||
close(d.runExit)
|
|
||||||
}
|
|
||||||
|
|
|
@ -74,6 +74,7 @@ func TestDefaultDiscoverer(t *testing.T) {
|
||||||
assert.Equal(t, 0, len(d.BadPeers()))
|
assert.Equal(t, 0, len(d.BadPeers()))
|
||||||
require.Equal(t, set1, set1D)
|
require.Equal(t, set1, set1D)
|
||||||
}
|
}
|
||||||
|
require.Equal(t, 2, d.GetFanOut())
|
||||||
|
|
||||||
// Request should make goroutines dial our addresses draining the pool.
|
// Request should make goroutines dial our addresses draining the pool.
|
||||||
d.RequestRemote(len(set1))
|
d.RequestRemote(len(set1))
|
||||||
|
@ -131,14 +132,13 @@ func TestDefaultDiscoverer(t *testing.T) {
|
||||||
for _, addr := range set1 {
|
for _, addr := range set1 {
|
||||||
d.UnregisterConnectedAddr(addr)
|
d.UnregisterConnectedAddr(addr)
|
||||||
}
|
}
|
||||||
assert.Equal(t, 0, len(d.UnconnectedPeers()))
|
assert.Equal(t, 2, len(d.UnconnectedPeers())) // They're re-added automatically.
|
||||||
assert.Equal(t, 0, len(d.BadPeers()))
|
assert.Equal(t, 0, len(d.BadPeers()))
|
||||||
assert.Equal(t, len(set1), len(d.GoodPeers()))
|
assert.Equal(t, len(set1), len(d.GoodPeers()))
|
||||||
require.Equal(t, 0, d.PoolCount())
|
require.Equal(t, 2, d.PoolCount())
|
||||||
|
|
||||||
// Now make Dial() fail and wait to see addresses in the bad list.
|
// Now make Dial() fail and wait to see addresses in the bad list.
|
||||||
atomic.StoreInt32(&ts.retFalse, 1)
|
atomic.StoreInt32(&ts.retFalse, 1)
|
||||||
d.BackFill(set1...)
|
|
||||||
assert.Equal(t, len(set1), d.PoolCount())
|
assert.Equal(t, len(set1), d.PoolCount())
|
||||||
set1D := d.UnconnectedPeers()
|
set1D := d.UnconnectedPeers()
|
||||||
sort.Strings(set1D)
|
sort.Strings(set1D)
|
||||||
|
@ -157,7 +157,7 @@ func TestDefaultDiscoverer(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
require.Equal(t, 0, d.PoolCount())
|
require.Eventually(t, func() bool { return d.PoolCount() == 0 }, 2*time.Second, 50*time.Millisecond)
|
||||||
sort.Strings(dialledBad)
|
sort.Strings(dialledBad)
|
||||||
for i := 0; i < len(set1); i++ {
|
for i := 0; i < len(set1); i++ {
|
||||||
for j := 0; j < connRetries; j++ {
|
for j := 0; j < connRetries; j++ {
|
||||||
|
@ -174,10 +174,6 @@ func TestDefaultDiscoverer(t *testing.T) {
|
||||||
assert.Equal(t, len(set1), len(d.BadPeers()))
|
assert.Equal(t, len(set1), len(d.BadPeers()))
|
||||||
assert.Equal(t, 0, len(d.GoodPeers()))
|
assert.Equal(t, 0, len(d.GoodPeers()))
|
||||||
require.Equal(t, 0, d.PoolCount())
|
require.Equal(t, 0, d.PoolCount())
|
||||||
|
|
||||||
// Close should work and subsequent RequestRemote is a no-op.
|
|
||||||
d.Close()
|
|
||||||
d.RequestRemote(42)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSeedDiscovery(t *testing.T) {
|
func TestSeedDiscovery(t *testing.T) {
|
||||||
|
|
|
@ -33,13 +33,22 @@ func (d *testDiscovery) BackFill(addrs ...string) {
|
||||||
defer d.Unlock()
|
defer d.Unlock()
|
||||||
d.backfill = append(d.backfill, addrs...)
|
d.backfill = append(d.backfill, addrs...)
|
||||||
}
|
}
|
||||||
func (d *testDiscovery) Close() {}
|
|
||||||
func (d *testDiscovery) PoolCount() int { return 0 }
|
func (d *testDiscovery) PoolCount() int { return 0 }
|
||||||
func (d *testDiscovery) RegisterBadAddr(addr string) {
|
func (d *testDiscovery) RegisterBadAddr(addr string) {
|
||||||
d.Lock()
|
d.Lock()
|
||||||
defer d.Unlock()
|
defer d.Unlock()
|
||||||
d.bad = append(d.bad, addr)
|
d.bad = append(d.bad, addr)
|
||||||
}
|
}
|
||||||
|
func (d *testDiscovery) GetFanOut() int {
|
||||||
|
d.Lock()
|
||||||
|
defer d.Unlock()
|
||||||
|
return (len(d.connected) + len(d.backfill)) * 2 / 3
|
||||||
|
}
|
||||||
|
func (d *testDiscovery) NetworkSize() int {
|
||||||
|
d.Lock()
|
||||||
|
defer d.Unlock()
|
||||||
|
return len(d.connected) + len(d.backfill)
|
||||||
|
}
|
||||||
func (d *testDiscovery) RegisterGoodAddr(string, capability.Capabilities) {}
|
func (d *testDiscovery) RegisterGoodAddr(string, capability.Capabilities) {}
|
||||||
func (d *testDiscovery) RegisterConnectedAddr(addr string) {
|
func (d *testDiscovery) RegisterConnectedAddr(addr string) {
|
||||||
d.Lock()
|
d.Lock()
|
||||||
|
@ -188,6 +197,5 @@ func newTestServerWithCustomCfg(t *testing.T, serverConfig ServerConfig, protoco
|
||||||
s, err := newServerFromConstructors(serverConfig, fakechain.NewFakeChainWithCustomCfg(protocolCfg), new(fakechain.FakeStateSync), zaptest.NewLogger(t),
|
s, err := newServerFromConstructors(serverConfig, fakechain.NewFakeChainWithCustomCfg(protocolCfg), new(fakechain.FakeStateSync), zaptest.NewLogger(t),
|
||||||
newFakeTransp, newTestDiscovery)
|
newFakeTransp, newTestDiscovery)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
t.Cleanup(s.discovery.Close)
|
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,14 @@ import (
|
||||||
|
|
||||||
// Metric used in monitoring service.
|
// Metric used in monitoring service.
|
||||||
var (
|
var (
|
||||||
|
estimatedNetworkSize = prometheus.NewGauge(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Help: "Estimated network size",
|
||||||
|
Name: "network_size",
|
||||||
|
Namespace: "neogo",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
peersConnected = prometheus.NewGauge(
|
peersConnected = prometheus.NewGauge(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Help: "Number of connected peers",
|
Help: "Number of connected peers",
|
||||||
|
@ -42,6 +50,7 @@ var (
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
prometheus.MustRegister(
|
prometheus.MustRegister(
|
||||||
|
estimatedNetworkSize,
|
||||||
peersConnected,
|
peersConnected,
|
||||||
servAndNodeVersion,
|
servAndNodeVersion,
|
||||||
poolCount,
|
poolCount,
|
||||||
|
@ -49,6 +58,10 @@ func init() {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func updateNetworkSizeMetric(sz int) {
|
||||||
|
estimatedNetworkSize.Set(float64(sz))
|
||||||
|
}
|
||||||
|
|
||||||
func updateBlockQueueLenMetric(bqLen int) {
|
func updateBlockQueueLenMetric(bqLen int) {
|
||||||
blockQueueLength.Set(float64(bqLen))
|
blockQueueLength.Set(float64(bqLen))
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,7 @@ const (
|
||||||
defaultAttemptConnPeers = 20
|
defaultAttemptConnPeers = 20
|
||||||
defaultMaxPeers = 100
|
defaultMaxPeers = 100
|
||||||
defaultExtensiblePoolSize = 20
|
defaultExtensiblePoolSize = 20
|
||||||
|
defaultBroadcastFactor = 0
|
||||||
maxBlockBatch = 200
|
maxBlockBatch = 200
|
||||||
minPoolCount = 30
|
minPoolCount = 30
|
||||||
)
|
)
|
||||||
|
@ -222,6 +223,13 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
|
||||||
s.AttemptConnPeers = defaultAttemptConnPeers
|
s.AttemptConnPeers = defaultAttemptConnPeers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if s.BroadcastFactor < 0 || s.BroadcastFactor > 100 {
|
||||||
|
s.log.Info("bad BroadcastFactor configured, using the default value",
|
||||||
|
zap.Int("configured", s.BroadcastFactor),
|
||||||
|
zap.Int("actual", defaultBroadcastFactor))
|
||||||
|
s.BroadcastFactor = defaultBroadcastFactor
|
||||||
|
}
|
||||||
|
|
||||||
s.transport = newTransport(s)
|
s.transport = newTransport(s)
|
||||||
s.discovery = newDiscovery(
|
s.discovery = newDiscovery(
|
||||||
s.Seeds,
|
s.Seeds,
|
||||||
|
@ -261,7 +269,6 @@ func (s *Server) Start(errChan chan error) {
|
||||||
func (s *Server) Shutdown() {
|
func (s *Server) Shutdown() {
|
||||||
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
|
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
|
||||||
s.transport.Close()
|
s.transport.Close()
|
||||||
s.discovery.Close()
|
|
||||||
for _, p := range s.getPeers(nil) {
|
for _, p := range s.getPeers(nil) {
|
||||||
p.Disconnect(errServerShutdown)
|
p.Disconnect(errServerShutdown)
|
||||||
}
|
}
|
||||||
|
@ -380,10 +387,28 @@ func (s *Server) ConnectedPeers() []string {
|
||||||
// while itself dealing with peers management (handling connects/disconnects).
|
// while itself dealing with peers management (handling connects/disconnects).
|
||||||
func (s *Server) run() {
|
func (s *Server) run() {
|
||||||
go s.runProto()
|
go s.runProto()
|
||||||
for {
|
for loopCnt := 0; ; loopCnt++ {
|
||||||
if s.PeerCount() < s.MinPeers {
|
var (
|
||||||
|
netSize = s.discovery.NetworkSize()
|
||||||
|
// "Optimal" number of peers.
|
||||||
|
optimalN = s.discovery.GetFanOut() * 2
|
||||||
|
// Real number of peers.
|
||||||
|
peerN = s.PeerCount()
|
||||||
|
)
|
||||||
|
|
||||||
|
if peerN < s.MinPeers {
|
||||||
|
// Starting up or going below the minimum -> quickly get many new peers.
|
||||||
s.discovery.RequestRemote(s.AttemptConnPeers)
|
s.discovery.RequestRemote(s.AttemptConnPeers)
|
||||||
|
} else if s.MinPeers > 0 && loopCnt%s.MinPeers == 0 && optimalN > peerN && optimalN < s.MaxPeers && optimalN < netSize {
|
||||||
|
// Having some number of peers, but probably can get some more, the network is big.
|
||||||
|
// It also allows to start picking up new peers proactively, before we suddenly have <s.MinPeers of them.
|
||||||
|
var connN = s.AttemptConnPeers
|
||||||
|
if connN > optimalN-peerN {
|
||||||
|
connN = optimalN - peerN
|
||||||
}
|
}
|
||||||
|
s.discovery.RequestRemote(connN)
|
||||||
|
}
|
||||||
|
|
||||||
if s.discovery.PoolCount() < minPoolCount {
|
if s.discovery.PoolCount() < minPoolCount {
|
||||||
s.broadcastHPMessage(NewMessage(CMDGetAddr, payload.NewNullPayload()))
|
s.broadcastHPMessage(NewMessage(CMDGetAddr, payload.NewNullPayload()))
|
||||||
}
|
}
|
||||||
|
@ -439,11 +464,9 @@ func (s *Server) run() {
|
||||||
s.lock.RUnlock()
|
s.lock.RUnlock()
|
||||||
if !stillConnected {
|
if !stillConnected {
|
||||||
s.discovery.UnregisterConnectedAddr(addr)
|
s.discovery.UnregisterConnectedAddr(addr)
|
||||||
s.discovery.BackFill(addr)
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
s.discovery.UnregisterConnectedAddr(addr)
|
s.discovery.UnregisterConnectedAddr(addr)
|
||||||
s.discovery.BackFill(addr)
|
|
||||||
}
|
}
|
||||||
updatePeersConnectedMetric(s.PeerCount())
|
updatePeersConnectedMetric(s.PeerCount())
|
||||||
} else {
|
} else {
|
||||||
|
@ -644,7 +667,6 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {
|
||||||
return errInvalidNetwork
|
return errInvalidNetwork
|
||||||
}
|
}
|
||||||
peerAddr := p.PeerAddr().String()
|
peerAddr := p.PeerAddr().String()
|
||||||
s.discovery.RegisterConnectedAddr(peerAddr)
|
|
||||||
s.lock.RLock()
|
s.lock.RLock()
|
||||||
for peer := range s.peers {
|
for peer := range s.peers {
|
||||||
if p == peer {
|
if p == peer {
|
||||||
|
@ -658,6 +680,7 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.lock.RUnlock()
|
s.lock.RUnlock()
|
||||||
|
s.discovery.RegisterConnectedAddr(peerAddr)
|
||||||
return p.SendVersionAck(NewMessage(CMDVerack, payload.NewNullPayload()))
|
return p.SendVersionAck(NewMessage(CMDVerack, payload.NewNullPayload()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1354,8 +1377,13 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, context.C
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var replies = make(chan error, peerN) // Cache is there just to make goroutines exit faster.
|
var (
|
||||||
var ctx, cancel = context.WithTimeout(context.Background(), s.TimePerBlock/2)
|
// Optimal number of recipients.
|
||||||
|
enoughN = s.discovery.GetFanOut()
|
||||||
|
replies = make(chan error, peerN) // Cache is there just to make goroutines exit faster.
|
||||||
|
ctx, cancel = context.WithTimeout(context.Background(), s.TimePerBlock/2)
|
||||||
|
)
|
||||||
|
enoughN = (enoughN*(100-s.BroadcastFactor) + peerN*s.BroadcastFactor) / 100
|
||||||
for _, peer := range peers {
|
for _, peer := range peers {
|
||||||
go func(p Peer, ctx context.Context, pkt []byte) {
|
go func(p Peer, ctx context.Context, pkt []byte) {
|
||||||
// Do this before packet is sent, reader thread can get the reply before this routine wakes up.
|
// Do this before packet is sent, reader thread can get the reply before this routine wakes up.
|
||||||
|
@ -1377,8 +1405,7 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, context.C
|
||||||
if sentN+deadN == peerN {
|
if sentN+deadN == peerN {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// Send to 2/3 of good peers.
|
if sentN >= enoughN && ctx.Err() == nil {
|
||||||
if 3*sentN >= 2*(peerN-deadN) && ctx.Err() == nil {
|
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,6 +78,9 @@ type (
|
||||||
|
|
||||||
// ExtensiblePoolSize is the size of the pool for extensible payloads from a single sender.
|
// ExtensiblePoolSize is the size of the pool for extensible payloads from a single sender.
|
||||||
ExtensiblePoolSize int
|
ExtensiblePoolSize int
|
||||||
|
|
||||||
|
// BroadcastFactor is the factor (0-100) for fan-out optimization.
|
||||||
|
BroadcastFactor int
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -107,5 +110,6 @@ func NewServerConfig(cfg config.Config) ServerConfig {
|
||||||
P2PNotaryCfg: appConfig.P2PNotary,
|
P2PNotaryCfg: appConfig.P2PNotary,
|
||||||
StateRootCfg: appConfig.StateRoot,
|
StateRootCfg: appConfig.StateRoot,
|
||||||
ExtensiblePoolSize: appConfig.ExtensiblePoolSize,
|
ExtensiblePoolSize: appConfig.ExtensiblePoolSize,
|
||||||
|
BroadcastFactor: appConfig.BroadcastFactor,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue