From 6ba4afc97773be7eaf2d1a66e50bc6046bfa0c1b Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 17 Nov 2022 16:40:29 +0300 Subject: [PATCH 1/6] network: consider handshaked peers only when comparing with MinPeers We don't know a lot about non-handshaked ones, so it's safer to try more connection. --- pkg/network/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index 7210ddf86..be6a1d3a6 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -409,7 +409,7 @@ func (s *Server) run() { // "Optimal" number of peers. optimalN = s.discovery.GetFanOut() * 2 // Real number of peers. - peerN = s.PeerCount() + peerN = s.HandshakedPeersCount() ) if peerN < s.MinPeers { From 23f118a1a9d20ab8cc9d7e414a038898eaa3364c Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 17 Nov 2022 17:07:19 +0300 Subject: [PATCH 2/6] network: rework discoverer/server interaction * treat connected/handshaked peers separately in the discoverer, save "original" address for connected ones, it can be a name instead of IP and it's important to keep it to avoid reconnections * store name->IP mapping for seeds if and when they're connected to avoid reconnections * block seed if it's detected to be our own node (which is often the case for small private networks) * add an event for handshaked peers in the server, connected but non-handshaked ones are not really helpful for MinPeers or GetAddr logic Fixes #2796. --- pkg/network/discovery.go | 114 +++++++++++++++++++++++----------- pkg/network/discovery_test.go | 43 ++++++++++--- pkg/network/helper_test.go | 18 +++--- pkg/network/peer.go | 21 +++++-- pkg/network/server.go | 44 +++++-------- pkg/network/server_test.go | 3 + pkg/network/tcp_peer.go | 21 ++++--- pkg/network/tcp_peer_test.go | 4 +- pkg/network/tcp_transport.go | 10 +-- pkg/network/transport.go | 2 +- 10 files changed, 180 insertions(+), 100 deletions(-) diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index 2e762806c..9a6b96b23 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -22,10 +22,10 @@ type Discoverer interface { NetworkSize() int PoolCount() int RequestRemote(int) - RegisterBadAddr(string) - RegisterGoodAddr(string, capability.Capabilities) - RegisterConnectedAddr(string) - UnregisterConnectedAddr(string) + RegisterSelf(AddressablePeer) + RegisterGood(AddressablePeer) + RegisterConnected(AddressablePeer) + UnregisterConnected(AddressablePeer, bool) UnconnectedPeers() []string BadPeers() []string GoodPeers() []AddressWithCapabilities @@ -39,12 +39,13 @@ type AddressWithCapabilities struct { // DefaultDiscovery default implementation of the Discoverer interface. type DefaultDiscovery struct { - seeds []string + seeds map[string]string transport Transporter lock sync.RWMutex dialTimeout time.Duration badAddrs map[string]bool connectedAddrs map[string]bool + handshakedAddrs map[string]bool goodAddrs map[string]capability.Capabilities unconnectedAddrs map[string]int attempted map[string]bool @@ -55,12 +56,17 @@ type DefaultDiscovery struct { // NewDefaultDiscovery returns a new DefaultDiscovery. func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *DefaultDiscovery { + var seeds = make(map[string]string) + for i := range addrs { + seeds[addrs[i]] = "" + } d := &DefaultDiscovery{ - seeds: addrs, + seeds: seeds, transport: ts, dialTimeout: dt, badAddrs: make(map[string]bool), connectedAddrs: make(map[string]bool), + handshakedAddrs: make(map[string]bool), goodAddrs: make(map[string]capability.Capabilities), unconnectedAddrs: make(map[string]int), attempted: make(map[string]bool), @@ -83,7 +89,7 @@ func (d *DefaultDiscovery) BackFill(addrs ...string) { func (d *DefaultDiscovery) backfill(addrs ...string) { for _, addr := range addrs { - if d.badAddrs[addr] || d.connectedAddrs[addr] || + if d.badAddrs[addr] || d.connectedAddrs[addr] || d.handshakedAddrs[addr] || d.unconnectedAddrs[addr] > 0 { continue } @@ -117,7 +123,7 @@ func (d *DefaultDiscovery) RequestRemote(requested int) { var nextAddr string d.lock.Lock() for addr := range d.unconnectedAddrs { - if !d.connectedAddrs[addr] && !d.attempted[addr] { + if !d.connectedAddrs[addr] && !d.handshakedAddrs[addr] && !d.attempted[addr] { nextAddr = addr break } @@ -125,8 +131,8 @@ func (d *DefaultDiscovery) RequestRemote(requested int) { if nextAddr == "" { // Empty pool, try seeds. - for _, addr := range d.seeds { - if !d.connectedAddrs[addr] && !d.attempted[addr] { + for addr, ip := range d.seeds { + if ip == "" && !d.attempted[addr] { nextAddr = addr break } @@ -144,26 +150,33 @@ func (d *DefaultDiscovery) RequestRemote(requested int) { } } -// RegisterBadAddr registers the given address as a bad address. -func (d *DefaultDiscovery) RegisterBadAddr(addr string) { - var isSeed bool +// RegisterSelf registers the given Peer as a bad one, because it's our own node. +func (d *DefaultDiscovery) RegisterSelf(p AddressablePeer) { + var connaddr = p.ConnectionAddr() d.lock.Lock() - for _, seed := range d.seeds { - if addr == seed { - isSeed = true - break + delete(d.connectedAddrs, connaddr) + d.registerBad(connaddr, true) + d.registerBad(p.PeerAddr().String(), true) + d.lock.Unlock() +} + +func (d *DefaultDiscovery) registerBad(addr string, force bool) { + _, isSeed := d.seeds[addr] + if isSeed { + if !force { + d.seeds[addr] = "" + } else { + d.seeds[addr] = "forever" // That's our own address, so never try connecting to it. } - } - if !isSeed { + } else { d.unconnectedAddrs[addr]-- - if d.unconnectedAddrs[addr] <= 0 { + if d.unconnectedAddrs[addr] <= 0 || force { d.badAddrs[addr] = true delete(d.unconnectedAddrs, addr) delete(d.goodAddrs, addr) } } d.updateNetSize() - d.lock.Unlock() } // UnconnectedPeers returns all addresses of unconnected addrs. @@ -203,31 +216,53 @@ func (d *DefaultDiscovery) GoodPeers() []AddressWithCapabilities { return addrs } -// RegisterGoodAddr registers a known good connected address that has passed +// RegisterGood registers a known good connected peer that has passed // handshake successfully. -func (d *DefaultDiscovery) RegisterGoodAddr(s string, c capability.Capabilities) { +func (d *DefaultDiscovery) RegisterGood(p AddressablePeer) { + var s = p.PeerAddr().String() d.lock.Lock() - d.goodAddrs[s] = c + d.handshakedAddrs[s] = true + d.goodAddrs[s] = p.Version().Capabilities delete(d.badAddrs, s) d.lock.Unlock() } -// UnregisterConnectedAddr tells the discoverer that this address is no longer +// UnregisterConnected tells the discoverer that this peer is no longer // connected, but it is still considered a good one. -func (d *DefaultDiscovery) UnregisterConnectedAddr(s string) { +func (d *DefaultDiscovery) UnregisterConnected(p AddressablePeer, duplicate bool) { + var ( + peeraddr = p.PeerAddr().String() + connaddr = p.ConnectionAddr() + ) d.lock.Lock() - delete(d.connectedAddrs, s) - d.backfill(s) + delete(d.connectedAddrs, connaddr) + if !duplicate { + for addr, ip := range d.seeds { + if ip == peeraddr { + d.seeds[addr] = "" + break + } + } + delete(d.handshakedAddrs, peeraddr) + if _, ok := d.goodAddrs[peeraddr]; ok { + d.backfill(peeraddr) + } + } d.lock.Unlock() } -// RegisterConnectedAddr tells discoverer that the given address is now connected. -func (d *DefaultDiscovery) RegisterConnectedAddr(addr string) { +// RegisterConnected tells discoverer that the given peer is now connected. +func (d *DefaultDiscovery) RegisterConnected(p AddressablePeer) { + var addr = p.ConnectionAddr() d.lock.Lock() + d.registerConnected(addr) + d.lock.Unlock() +} + +func (d *DefaultDiscovery) registerConnected(addr string) { delete(d.unconnectedAddrs, addr) d.connectedAddrs[addr] = true d.updateNetSize() - d.lock.Unlock() } // GetFanOut returns the optimal number of nodes to broadcast packets to. @@ -242,9 +277,9 @@ func (d *DefaultDiscovery) NetworkSize() int { // updateNetSize updates network size estimation metric. Must be called under read lock. func (d *DefaultDiscovery) updateNetSize() { - var netsize = len(d.connectedAddrs) + len(d.unconnectedAddrs) + 1 // 1 for the node itself. - var fanOut = 2.5 * math.Log(float64(netsize-1)) // -1 for the number of potential peers. - if netsize == 2 { // log(1) == 0. + var netsize = len(d.handshakedAddrs) + len(d.unconnectedAddrs) + 1 // 1 for the node itself. + var fanOut = 2.5 * math.Log(float64(netsize-1)) // -1 for the number of potential peers. + if netsize == 2 { // log(1) == 0. fanOut = 1 // But we still want to push messages to the peer. } @@ -255,12 +290,19 @@ func (d *DefaultDiscovery) updateNetSize() { } func (d *DefaultDiscovery) tryAddress(addr string) { - err := d.transport.Dial(addr, d.dialTimeout) + p, err := d.transport.Dial(addr, d.dialTimeout) d.lock.Lock() delete(d.attempted, addr) + if err == nil { + if _, ok := d.seeds[addr]; ok { + d.seeds[addr] = p.PeerAddr().String() + } + d.registerConnected(addr) + } else { + d.registerBad(addr, false) + } d.lock.Unlock() if err != nil { - d.RegisterBadAddr(addr) time.Sleep(d.dialTimeout) d.RequestRemote(1) } diff --git a/pkg/network/discovery_test.go b/pkg/network/discovery_test.go index fdcea6af1..f5df858a5 100644 --- a/pkg/network/discovery_test.go +++ b/pkg/network/discovery_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/nspcc-dev/neo-go/pkg/network/capability" + "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" atomic2 "go.uber.org/atomic" @@ -22,18 +23,40 @@ type fakeTransp struct { addr string } +type fakeAPeer struct { + addr string + peer string + version *payload.Version +} + +func (f *fakeAPeer) ConnectionAddr() string { + return f.addr +} + +func (f *fakeAPeer) PeerAddr() net.Addr { + tcpAddr, err := net.ResolveTCPAddr("tcp", f.peer) + if err != nil { + panic(err) + } + return tcpAddr +} + +func (f *fakeAPeer) Version() *payload.Version { + return f.version +} + func newFakeTransp(s *Server) Transporter { return &fakeTransp{} } -func (ft *fakeTransp) Dial(addr string, timeout time.Duration) error { +func (ft *fakeTransp) Dial(addr string, timeout time.Duration) (AddressablePeer, error) { var ret error if atomic.LoadInt32(&ft.retFalse) > 0 { ret = errors.New("smth bad happened") } ft.dialCh <- addr - return ret + return &fakeAPeer{addr: addr, peer: addr}, ret } func (ft *fakeTransp) Accept() { if ft.started.Load() { @@ -83,7 +106,7 @@ func TestDefaultDiscoverer(t *testing.T) { select { case a := <-ts.dialCh: dialled = append(dialled, a) - d.RegisterConnectedAddr(a) + d.RegisterConnected(&fakeAPeer{addr: a, peer: a}) case <-time.After(time.Second): t.Fatalf("timeout expecting for transport dial") } @@ -97,10 +120,14 @@ func TestDefaultDiscoverer(t *testing.T) { // Registered good addresses should end up in appropriate set. for _, addr := range set1 { - d.RegisterGoodAddr(addr, capability.Capabilities{ - { - Type: capability.FullNode, - Data: &capability.Node{StartHeight: 123}, + d.RegisterGood(&fakeAPeer{ + addr: addr, + peer: addr, + version: &payload.Version{ + Capabilities: capability.Capabilities{{ + Type: capability.FullNode, + Data: &capability.Node{StartHeight: 123}, + }}, }, }) } @@ -130,7 +157,7 @@ func TestDefaultDiscoverer(t *testing.T) { // Unregistering connected should work. for _, addr := range set1 { - d.UnregisterConnectedAddr(addr) + d.UnregisterConnected(&fakeAPeer{addr: addr, peer: addr}, false) } assert.Equal(t, 2, len(d.UnconnectedPeers())) // They're re-added automatically. assert.Equal(t, 0, len(d.BadPeers())) diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index a44ae9f00..0a4f3f311 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -13,7 +13,6 @@ import ( "github.com/nspcc-dev/neo-go/internal/fakechain" "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/io" - "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" @@ -35,10 +34,10 @@ func (d *testDiscovery) BackFill(addrs ...string) { d.backfill = append(d.backfill, addrs...) } func (d *testDiscovery) PoolCount() int { return 0 } -func (d *testDiscovery) RegisterBadAddr(addr string) { +func (d *testDiscovery) RegisterSelf(p AddressablePeer) { d.Lock() defer d.Unlock() - d.bad = append(d.bad, addr) + d.bad = append(d.bad, p.ConnectionAddr()) } func (d *testDiscovery) GetFanOut() int { d.Lock() @@ -50,16 +49,16 @@ func (d *testDiscovery) NetworkSize() int { defer d.Unlock() return len(d.connected) + len(d.backfill) } -func (d *testDiscovery) RegisterGoodAddr(string, capability.Capabilities) {} -func (d *testDiscovery) RegisterConnectedAddr(addr string) { +func (d *testDiscovery) RegisterGood(AddressablePeer) {} +func (d *testDiscovery) RegisterConnected(p AddressablePeer) { d.Lock() defer d.Unlock() - d.connected = append(d.connected, addr) + d.connected = append(d.connected, p.ConnectionAddr()) } -func (d *testDiscovery) UnregisterConnectedAddr(addr string) { +func (d *testDiscovery) UnregisterConnected(p AddressablePeer, force bool) { d.Lock() defer d.Unlock() - d.unregistered = append(d.unregistered, addr) + d.unregistered = append(d.unregistered, p.ConnectionAddr()) } func (d *testDiscovery) UnconnectedPeers() []string { d.Lock() @@ -100,6 +99,9 @@ func newLocalPeer(t *testing.T, s *Server) *localPeer { } } +func (p *localPeer) ConnectionAddr() string { + return p.netaddr.String() +} func (p *localPeer) RemoteAddr() net.Addr { return &p.netaddr } diff --git a/pkg/network/peer.go b/pkg/network/peer.go index 6dfcf16e0..2732c5a30 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -7,10 +7,12 @@ import ( "github.com/nspcc-dev/neo-go/pkg/network/payload" ) -// Peer represents a network node neo-go is connected to. -type Peer interface { - // RemoteAddr returns the remote address that we're connected to now. - RemoteAddr() net.Addr +type AddressablePeer interface { + // ConnectionAddr returns an address-like identifier of this connection + // before we have a proper one (after the handshake). It's either the + // address from discoverer (if initiated from node) or one from socket + // (if connected to node from outside). + ConnectionAddr() string // PeerAddr returns the remote address that should be used to establish // a new connection to the node. It can differ from the RemoteAddr // address in case the remote node is a client and its current @@ -18,6 +20,16 @@ type Peer interface { // to connect to it. It's only valid after the handshake is completed. // Before that, it returns the same address as RemoteAddr. PeerAddr() net.Addr + // Version returns peer's version message if the peer has handshaked + // already. + Version() *payload.Version +} + +// Peer represents a network node neo-go is connected to. +type Peer interface { + AddressablePeer + // RemoteAddr returns the remote address that we're connected to now. + RemoteAddr() net.Addr Disconnect(error) // BroadcastPacket is a context-bound packet enqueuer, it either puts the @@ -49,7 +61,6 @@ type Peer interface { // EnqueueHPPacket is similar to EnqueueHPMessage, but accepts a slice of // message(s) bytes. EnqueueHPPacket([]byte) error - Version() *payload.Version LastBlockIndex() uint32 Handshaked() bool IsFullNode() bool diff --git a/pkg/network/server.go b/pkg/network/server.go index be6a1d3a6..c31fb5f29 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -127,6 +127,7 @@ type ( register chan Peer unregister chan peerDrop + handshake chan Peer quit chan struct{} relayFin chan struct{} @@ -181,6 +182,7 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy relayFin: make(chan struct{}), register: make(chan Peer), unregister: make(chan peerDrop), + handshake: make(chan Peer), txInMap: make(map[util.Uint256]struct{}), peers: make(map[Peer]bool), syncReached: atomic.NewBool(false), @@ -462,31 +464,10 @@ func (s *Server) run() { zap.Stringer("addr", drop.peer.RemoteAddr()), zap.Error(drop.reason), zap.Int("peerCount", s.PeerCount())) - addr := drop.peer.PeerAddr().String() if errors.Is(drop.reason, errIdenticalID) { - s.discovery.RegisterBadAddr(addr) - } else if errors.Is(drop.reason, errAlreadyConnected) { - // There is a race condition when peer can be disconnected twice for the this reason - // which can lead to no connections to peer at all. Here we check for such a possibility. - stillConnected := false - s.lock.RLock() - verDrop := drop.peer.Version() - addr := drop.peer.PeerAddr().String() - if verDrop != nil { - for peer := range s.peers { - ver := peer.Version() - // Already connected, drop this connection. - if ver != nil && ver.Nonce == verDrop.Nonce && peer.PeerAddr().String() == addr { - stillConnected = true - } - } - } - s.lock.RUnlock() - if !stillConnected { - s.discovery.UnregisterConnectedAddr(addr) - } + s.discovery.RegisterSelf(drop.peer) } else { - s.discovery.UnregisterConnectedAddr(addr) + s.discovery.UnregisterConnected(drop.peer, errors.Is(drop.reason, errAlreadyConnected)) } updatePeersConnectedMetric(s.PeerCount()) } else { @@ -494,6 +475,19 @@ func (s *Server) run() { // because we have two goroutines sending signals here s.lock.Unlock() } + + case p := <-s.handshake: + ver := p.Version() + s.log.Info("started protocol", + zap.Stringer("addr", p.RemoteAddr()), + zap.ByteString("userAgent", ver.UserAgent), + zap.Uint32("startHeight", p.LastBlockIndex()), + zap.Uint32("id", ver.Nonce)) + + s.discovery.RegisterGood(p) + + s.tryInitStateSync() + s.tryStartServices() } } } @@ -700,7 +694,6 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { } } s.lock.RUnlock() - s.discovery.RegisterConnectedAddr(peerAddr) return p.SendVersionAck(NewMessage(CMDVerack, payload.NewNullPayload())) } @@ -1356,9 +1349,6 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { return err } go peer.StartProtocol() - - s.tryInitStateSync() - s.tryStartServices() default: return fmt.Errorf("received '%s' during handshake", msg.Command.String()) } diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 0f6b1e344..4ad30687d 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -141,14 +141,17 @@ func TestServerRegisterPeer(t *testing.T) { for i := range ps { ps[i] = newLocalPeer(t, s) ps[i].netaddr.Port = i + 1 + ps[i].version = &payload.Version{Nonce: uint32(i), UserAgent: []byte("fake")} } startWithCleanup(t, s) s.register <- ps[0] require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10) + s.handshake <- ps[0] s.register <- ps[1] + s.handshake <- ps[1] require.Eventually(t, func() bool { return 2 == s.PeerCount() }, time.Second, time.Millisecond*10) require.Equal(t, 0, len(s.discovery.UnconnectedPeers())) diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 392934615..a3e44e67d 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -13,7 +13,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/payload" "go.uber.org/atomic" - "go.uber.org/zap" ) type handShakeStage uint8 @@ -48,6 +47,8 @@ type TCPPeer struct { version *payload.Version // Index of the last block. lastBlockIndex uint32 + // pre-handshake non-canonical connection address. + addr string lock sync.RWMutex finale sync.Once @@ -69,10 +70,11 @@ type TCPPeer struct { } // NewTCPPeer returns a TCPPeer structure based on the given connection. -func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer { +func NewTCPPeer(conn net.Conn, addr string, s *Server) *TCPPeer { return &TCPPeer{ conn: conn, server: s, + addr: addr, done: make(chan struct{}), sendQ: make(chan []byte, requestQueueSize), p2pSendQ: make(chan []byte, p2pMsgQueueSize), @@ -256,13 +258,8 @@ func (p *TCPPeer) handleQueues() { func (p *TCPPeer) StartProtocol() { var err error - p.server.log.Info("started protocol", - zap.Stringer("addr", p.RemoteAddr()), - zap.ByteString("userAgent", p.Version().UserAgent), - zap.Uint32("startHeight", p.lastBlockIndex), - zap.Uint32("id", p.Version().Nonce)) + p.server.handshake <- p - p.server.discovery.RegisterGoodAddr(p.PeerAddr().String(), p.version.Capabilities) err = p.server.requestBlocksOrHeaders(p) if err != nil { p.Disconnect(err) @@ -384,6 +381,14 @@ func (p *TCPPeer) HandleVersionAck() error { return nil } +// ConnectionAddr implements the Peer interface. +func (p *TCPPeer) ConnectionAddr() string { + if p.addr != "" { + return p.addr + } + return p.conn.RemoteAddr().String() +} + // RemoteAddr implements the Peer interface. func (p *TCPPeer) RemoteAddr() net.Addr { return p.conn.RemoteAddr() diff --git a/pkg/network/tcp_peer_test.go b/pkg/network/tcp_peer_test.go index a4e2e8655..056bbf9b0 100644 --- a/pkg/network/tcp_peer_test.go +++ b/pkg/network/tcp_peer_test.go @@ -18,8 +18,8 @@ func connReadStub(conn net.Conn) { func TestPeerHandshake(t *testing.T) { server, client := net.Pipe() - tcpS := NewTCPPeer(server, newTestServer(t, ServerConfig{})) - tcpC := NewTCPPeer(client, newTestServer(t, ServerConfig{})) + tcpS := NewTCPPeer(server, "", newTestServer(t, ServerConfig{})) + tcpC := NewTCPPeer(client, "", newTestServer(t, ServerConfig{})) // Something should read things written into the pipe. go connReadStub(tcpS.conn) diff --git a/pkg/network/tcp_transport.go b/pkg/network/tcp_transport.go index dbce2ef53..b1b6662d0 100644 --- a/pkg/network/tcp_transport.go +++ b/pkg/network/tcp_transport.go @@ -30,14 +30,14 @@ func NewTCPTransport(s *Server, bindAddr string, log *zap.Logger) *TCPTransport } // Dial implements the Transporter interface. -func (t *TCPTransport) Dial(addr string, timeout time.Duration) error { +func (t *TCPTransport) Dial(addr string, timeout time.Duration) (AddressablePeer, error) { conn, err := net.DialTimeout("tcp", addr, timeout) if err != nil { - return err + return nil, err } - p := NewTCPPeer(conn, t.server) + p := NewTCPPeer(conn, addr, t.server) go p.handleConn() - return nil + return p, nil } // Accept implements the Transporter interface. @@ -69,7 +69,7 @@ func (t *TCPTransport) Accept() { t.log.Warn("TCP accept error", zap.Error(err)) continue } - p := NewTCPPeer(conn, t.server) + p := NewTCPPeer(conn, "", t.server) go p.handleConn() } } diff --git a/pkg/network/transport.go b/pkg/network/transport.go index 0f4d9e821..71228647b 100644 --- a/pkg/network/transport.go +++ b/pkg/network/transport.go @@ -5,7 +5,7 @@ import "time" // Transporter is an interface that allows us to abstract // any form of communication between the server and its peers. type Transporter interface { - Dial(addr string, timeout time.Duration) error + Dial(addr string, timeout time.Duration) (AddressablePeer, error) Accept() Proto() string Address() string From 1c7487b8e4de6dfabfeea334857da42e35353dd2 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 17 Nov 2022 17:32:05 +0300 Subject: [PATCH 3/6] network: add a timer to check for peers Consider initial connection phase for public networks: * simultaneous connections to seeds * very quick handshakes * got five handshaked peers and some getaddr requests sent * but addr replies won't trigger new connections * so we can stay with just five connections until any of them breaks or a (long) address checking timer fires This new timers solves the problem, it's adaptive at the same time. If we have enough peers we won't be waking up often. --- pkg/network/server.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index c31fb5f29..b3ce79bca 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -400,10 +400,12 @@ func (s *Server) ConnectedPeers() []string { func (s *Server) run() { var ( peerCheckTime = s.TimePerBlock * peerTimeFactor - peerCheckTimeout bool - timer = time.NewTimer(peerCheckTime) + addrCheckTimeout bool + addrTimer = time.NewTimer(peerCheckTime) + peerTimer = time.NewTimer(s.ProtoTickInterval) ) - defer timer.Stop() + defer addrTimer.Stop() + defer peerTimer.Stop() go s.runProto() for loopCnt := 0; ; loopCnt++ { var ( @@ -412,11 +414,15 @@ func (s *Server) run() { optimalN = s.discovery.GetFanOut() * 2 // Real number of peers. peerN = s.HandshakedPeersCount() + // Timeout value for the next peerTimer, long one by default. + peerT = peerCheckTime ) if peerN < s.MinPeers { // Starting up or going below the minimum -> quickly get many new peers. s.discovery.RequestRemote(s.AttemptConnPeers) + // Check/retry new connections soon. + peerT = s.ProtoTickInterval } else if s.MinPeers > 0 && loopCnt%s.MinPeers == 0 && optimalN > peerN && optimalN < s.MaxPeers && optimalN < netSize { // Having some number of peers, but probably can get some more, the network is big. // It also allows to start picking up new peers proactively, before we suddenly have Date: Thu, 17 Nov 2022 17:42:36 +0300 Subject: [PATCH 4/6] network: drop duplicationg check from handleAddrCmd() It was relevant with the queue-based discoverer, now it's not, discoverer handles this internally. --- pkg/network/server.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index b3ce79bca..4a8512e11 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -1196,11 +1196,9 @@ func (s *Server) handleAddrCmd(p Peer, addrs *payload.AddressList) error { if !p.CanProcessAddr() { return errors.New("unexpected addr received") } - dups := make(map[string]bool) for _, a := range addrs.Addrs { addr, err := a.GetTCPAddress() - if err == nil && !dups[addr] { - dups[addr] = true + if err == nil { s.discovery.BackFill(addr) } } From 075a54192c79dffffd7d8f5e46f10b495631f9e3 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 17 Nov 2022 18:03:04 +0300 Subject: [PATCH 5/6] network: don't try too many connections Consider mainnet, it has an AttemptConnPeers of 20, so may already have 3 peers and request 20 more, then have 4th connected and attemtp 20 more again, this leads to a huge number of connections easily. --- pkg/network/discovery.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index 9a6b96b23..f33e98980 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -49,6 +49,7 @@ type DefaultDiscovery struct { goodAddrs map[string]capability.Capabilities unconnectedAddrs map[string]int attempted map[string]bool + outstanding int32 optimalFanOut int32 networkSize int32 requestCh chan int @@ -119,6 +120,8 @@ func (d *DefaultDiscovery) pushToPoolOrDrop(addr string) { // RequestRemote tries to establish a connection with n nodes. func (d *DefaultDiscovery) RequestRemote(requested int) { + outstanding := int(atomic.LoadInt32(&d.outstanding)) + requested -= outstanding for ; requested > 0; requested-- { var nextAddr string d.lock.Lock() @@ -146,6 +149,7 @@ func (d *DefaultDiscovery) RequestRemote(requested int) { } d.attempted[nextAddr] = true d.lock.Unlock() + atomic.AddInt32(&d.outstanding, 1) go d.tryAddress(nextAddr) } } @@ -291,6 +295,7 @@ func (d *DefaultDiscovery) updateNetSize() { func (d *DefaultDiscovery) tryAddress(addr string) { p, err := d.transport.Dial(addr, d.dialTimeout) + atomic.AddInt32(&d.outstanding, -1) d.lock.Lock() delete(d.attempted, addr) if err == nil { From b8c09f509f27b23497f18d94d06642a2f608953c Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 17 Nov 2022 18:42:43 +0300 Subject: [PATCH 6/6] network: add random slight delay to connection attempts Small (especially dockerized/virtualized) networks often start all nodes at ones and then we see a lot of connection flapping in the log. This happens because nodes try to connect to each other simultaneously, establish two connections, then each one finds a duplicate and drops it, but this can be different duplicate connections on other sides, so they retry and it all happens for some time. Eventually everything settles, but we have a lot of garbage in the log and a lot of useless attempts. This random waiting timeout doesn't change the logic much, adds a minimal delay, but increases chances for both nodes to establish a proper single connection on both sides to only then see another one and drop it on both sides as well. It leads to almost no flapping in small networks, doesn't affect much bigger ones. The delay is close to unnoticeable especially if there is something in the DB for node to process during startup. --- pkg/network/discovery.go | 8 ++++++++ pkg/network/discovery_test.go | 2 ++ 2 files changed, 10 insertions(+) diff --git a/pkg/network/discovery.go b/pkg/network/discovery.go index f33e98980..5659ee6c0 100644 --- a/pkg/network/discovery.go +++ b/pkg/network/discovery.go @@ -2,6 +2,7 @@ package network import ( "math" + "math/rand" "sync" "sync/atomic" "time" @@ -14,6 +15,11 @@ const ( connRetries = 3 ) +var ( + // Maximum waiting time before connection attempt. + tryMaxWait = time.Second / 2 +) + // Discoverer is an interface that is responsible for maintaining // a healthy connection pool. type Discoverer interface { @@ -294,6 +300,8 @@ func (d *DefaultDiscovery) updateNetSize() { } func (d *DefaultDiscovery) tryAddress(addr string) { + var tout = rand.Int63n(int64(tryMaxWait)) + time.Sleep(time.Duration(tout)) // Have a sleep before working hard. p, err := d.transport.Dial(addr, d.dialTimeout) atomic.AddInt32(&d.outstanding, -1) d.lock.Lock() diff --git a/pkg/network/discovery_test.go b/pkg/network/discovery_test.go index f5df858a5..2c1d83fbf 100644 --- a/pkg/network/discovery_test.go +++ b/pkg/network/discovery_test.go @@ -82,6 +82,7 @@ func TestDefaultDiscoverer(t *testing.T) { ts.dialCh = make(chan string) d := NewDefaultDiscovery(nil, time.Second/16, ts) + tryMaxWait = 1 // Don't waste time. var set1 = []string{"1.1.1.1:10333", "2.2.2.2:10333"} sort.Strings(set1) @@ -211,6 +212,7 @@ func TestSeedDiscovery(t *testing.T) { sort.Strings(seeds) d := NewDefaultDiscovery(seeds, time.Second/10, ts) + tryMaxWait = 1 // Don't waste time. d.RequestRemote(len(seeds)) for i := 0; i < connRetries*2; i++ {