neoneo-go/pkg/network/discovery_test.go
Roman Khimov b8c09f509f network: add random slight delay to connection attempts
Small (especially dockerized/virtualized) networks often start all nodes at
ones and then we see a lot of connection flapping in the log. This happens
because nodes try to connect to each other simultaneously, establish two
connections, then each one finds a duplicate and drops it, but this can be
different duplicate connections on other sides, so they retry and it all
happens for some time. Eventually everything settles, but we have a lot of
garbage in the log and a lot of useless attempts.

This random waiting timeout doesn't change the logic much, adds a minimal
delay, but increases chances for both nodes to establish a proper single
connection on both sides to only then see another one and drop it on both
sides as well. It leads to almost no flapping in small networks, doesn't
affect much bigger ones. The delay is close to unnoticeable especially if
there is something in the DB for node to process during startup.
2022-11-17 18:42:43 +03:00

227 lines
6 KiB
Go

package network
import (
"errors"
"net"
"sort"
"sync/atomic"
"testing"
"time"
"github.com/nspcc-dev/neo-go/pkg/network/capability"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
atomic2 "go.uber.org/atomic"
)
type fakeTransp struct {
retFalse int32
started atomic2.Bool
closed atomic2.Bool
dialCh chan string
addr string
}
type fakeAPeer struct {
addr string
peer string
version *payload.Version
}
func (f *fakeAPeer) ConnectionAddr() string {
return f.addr
}
func (f *fakeAPeer) PeerAddr() net.Addr {
tcpAddr, err := net.ResolveTCPAddr("tcp", f.peer)
if err != nil {
panic(err)
}
return tcpAddr
}
func (f *fakeAPeer) Version() *payload.Version {
return f.version
}
func newFakeTransp(s *Server) Transporter {
return &fakeTransp{}
}
func (ft *fakeTransp) Dial(addr string, timeout time.Duration) (AddressablePeer, error) {
var ret error
if atomic.LoadInt32(&ft.retFalse) > 0 {
ret = errors.New("smth bad happened")
}
ft.dialCh <- addr
return &fakeAPeer{addr: addr, peer: addr}, ret
}
func (ft *fakeTransp) Accept() {
if ft.started.Load() {
panic("started twice")
}
ft.addr = net.JoinHostPort("0.0.0.0", "42")
ft.started.Store(true)
}
func (ft *fakeTransp) Proto() string {
return ""
}
func (ft *fakeTransp) Address() string {
return ft.addr
}
func (ft *fakeTransp) Close() {
if ft.closed.Load() {
panic("closed twice")
}
ft.closed.Store(true)
}
func TestDefaultDiscoverer(t *testing.T) {
ts := &fakeTransp{}
ts.dialCh = make(chan string)
d := NewDefaultDiscovery(nil, time.Second/16, ts)
tryMaxWait = 1 // Don't waste time.
var set1 = []string{"1.1.1.1:10333", "2.2.2.2:10333"}
sort.Strings(set1)
// Added addresses should end up in the pool and in the unconnected set.
// Done twice to check re-adding unconnected addresses, which should be
// a no-op.
for i := 0; i < 2; i++ {
d.BackFill(set1...)
assert.Equal(t, len(set1), d.PoolCount())
set1D := d.UnconnectedPeers()
sort.Strings(set1D)
assert.Equal(t, 0, len(d.GoodPeers()))
assert.Equal(t, 0, len(d.BadPeers()))
require.Equal(t, set1, set1D)
}
require.Equal(t, 2, d.GetFanOut())
// Request should make goroutines dial our addresses draining the pool.
d.RequestRemote(len(set1))
dialled := make([]string, 0)
for i := 0; i < len(set1); i++ {
select {
case a := <-ts.dialCh:
dialled = append(dialled, a)
d.RegisterConnected(&fakeAPeer{addr: a, peer: a})
case <-time.After(time.Second):
t.Fatalf("timeout expecting for transport dial")
}
}
require.Eventually(t, func() bool { return len(d.UnconnectedPeers()) == 0 }, 2*time.Second, 50*time.Millisecond)
sort.Strings(dialled)
assert.Equal(t, 0, d.PoolCount())
assert.Equal(t, 0, len(d.BadPeers()))
assert.Equal(t, 0, len(d.GoodPeers()))
require.Equal(t, set1, dialled)
// Registered good addresses should end up in appropriate set.
for _, addr := range set1 {
d.RegisterGood(&fakeAPeer{
addr: addr,
peer: addr,
version: &payload.Version{
Capabilities: capability.Capabilities{{
Type: capability.FullNode,
Data: &capability.Node{StartHeight: 123},
}},
},
})
}
gAddrWithCap := d.GoodPeers()
gAddrs := make([]string, len(gAddrWithCap))
for i, addr := range gAddrWithCap {
require.Equal(t, capability.Capabilities{
{
Type: capability.FullNode,
Data: &capability.Node{StartHeight: 123},
},
}, addr.Capabilities)
gAddrs[i] = addr.Address
}
sort.Strings(gAddrs)
assert.Equal(t, 0, d.PoolCount())
assert.Equal(t, 0, len(d.UnconnectedPeers()))
assert.Equal(t, 0, len(d.BadPeers()))
require.Equal(t, set1, gAddrs)
// Re-adding connected addresses should be no-op.
d.BackFill(set1...)
assert.Equal(t, 0, len(d.UnconnectedPeers()))
assert.Equal(t, 0, len(d.BadPeers()))
assert.Equal(t, len(set1), len(d.GoodPeers()))
require.Equal(t, 0, d.PoolCount())
// Unregistering connected should work.
for _, addr := range set1 {
d.UnregisterConnected(&fakeAPeer{addr: addr, peer: addr}, false)
}
assert.Equal(t, 2, len(d.UnconnectedPeers())) // They're re-added automatically.
assert.Equal(t, 0, len(d.BadPeers()))
assert.Equal(t, len(set1), len(d.GoodPeers()))
require.Equal(t, 2, d.PoolCount())
// Now make Dial() fail and wait to see addresses in the bad list.
atomic.StoreInt32(&ts.retFalse, 1)
assert.Equal(t, len(set1), d.PoolCount())
set1D := d.UnconnectedPeers()
sort.Strings(set1D)
assert.Equal(t, 0, len(d.BadPeers()))
require.Equal(t, set1, set1D)
dialledBad := make([]string, 0)
d.RequestRemote(len(set1))
for i := 0; i < connRetries; i++ {
for j := 0; j < len(set1); j++ {
select {
case a := <-ts.dialCh:
dialledBad = append(dialledBad, a)
case <-time.After(time.Second):
t.Fatalf("timeout expecting for transport dial; i: %d, j: %d", i, j)
}
}
}
require.Eventually(t, func() bool { return d.PoolCount() == 0 }, 2*time.Second, 50*time.Millisecond)
sort.Strings(dialledBad)
for i := 0; i < len(set1); i++ {
for j := 0; j < connRetries; j++ {
assert.Equal(t, set1[i], dialledBad[i*connRetries+j])
}
}
require.Eventually(t, func() bool { return len(d.BadPeers()) == len(set1) }, 2*time.Second, 50*time.Millisecond)
assert.Equal(t, 0, len(d.GoodPeers()))
assert.Equal(t, 0, len(d.UnconnectedPeers()))
// Re-adding bad addresses is a no-op.
d.BackFill(set1...)
assert.Equal(t, 0, len(d.UnconnectedPeers()))
assert.Equal(t, len(set1), len(d.BadPeers()))
assert.Equal(t, 0, len(d.GoodPeers()))
require.Equal(t, 0, d.PoolCount())
}
func TestSeedDiscovery(t *testing.T) {
var seeds = []string{"1.1.1.1:10333", "2.2.2.2:10333"}
ts := &fakeTransp{}
ts.dialCh = make(chan string)
atomic.StoreInt32(&ts.retFalse, 1) // Fail all dial requests.
sort.Strings(seeds)
d := NewDefaultDiscovery(seeds, time.Second/10, ts)
tryMaxWait = 1 // Don't waste time.
d.RequestRemote(len(seeds))
for i := 0; i < connRetries*2; i++ {
for range seeds {
select {
case <-ts.dialCh:
case <-time.After(time.Second):
t.Fatalf("timeout expecting for transport dial")
}
}
}
}