neo-go/pkg/network/helper_test.go

214 lines
5.2 KiB
Go
Raw Normal View History

package network
import (
network: rework broadcast logic We have a number of queues for different purposes: * regular broadcast queue * direct p2p queue * high-priority queue And two basic egress scenarios: * direct p2p messages (replies to requests in Server's handle* methods) * broadcasted messages Low priority broadcasted messages: * transaction inventories * block inventories * notary inventories * non-consensus extensibles High-priority broadcasted messages: * consensus extensibles * getdata transaction requests from consensus process * getaddr requests P2P messages are a bit more complicated, most of the time they use p2p queue, but extensible message requests/replies use HP queue. Server's handle* code is run from Peer's handleIncoming, every peer has this thread that handles incoming messages. When working with the peer it's important to reply to requests and blocking this thread until we send (queue) a reply is fine, if the peer is slow we just won't get anything new from it. The queue used is irrelevant wrt this issue. Broadcasted messages are radically different, we want them to be delivered to many peers, but we don't care about specific ones. If it's delivered to 2/3 of the peers we're fine, if it's delivered to more of them --- it's not an issue. But doing this fairly is not an easy thing, current code tries performing unblocked sends and if this doesn't yield enough results it then blocks (but has a timeout, we can't wait indefinitely). But it does so in sequential manner, once the peer is chosen the code will wait for it (and only it) until timeout happens. What can be done instead is an attempt to push the message to all of the peers simultaneously (or close to that). If they all deliver --- OK, if some block and wait then we can wait until _any_ of them pushes the message through (or global timeout happens, we still can't wait forever). If we have enough deliveries then we can cancel pending ones and it's again not an error if these canceled threads still do their job. This makes the system more dynamic and adds some substantial processing overhead, but it's a networking code, any of this overhead is much lower than the actual packet delivery time. It also allows to spread the load more fairly, if there is any spare queue it'll get the packet and release the broadcaster. On the next broadcast iteration another peer is more likely to be chosen just because it didn't get a message previously (and had some time to deliver already queued messages). It works perfectly in tests, with optimal networking conditions we have much better block times and TPS increases by 5-25%% depending on the scenario. I'd go as far as to say that it fixes the original problem of #2678, because in this particular scenario we have empty queues in ~100% of the cases and this new logic will likely lead to 100% fan out in this case (cancelation just won't happen fast enough). But when the load grows and there is some waiting in the queue it will optimize out the slowest links.
2022-10-10 19:48:06 +00:00
"context"
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/nspcc-dev/neo-go/internal/fakechain"
network: fix race in TestHandleGetMPTData Init server config before server start. Fixes the following data race: ``` WARNING: DATA RACE Write at 0x00c00032ef20 by goroutine 26: github.com/nspcc-dev/neo-go/pkg/network.TestHandleGetMPTData.func2() /go/src/github.com/nspcc-dev/neo-go/pkg/network/server_test.go:755 +0x10a testing.tRunner() /usr/local/go/src/testing/testing.go:1193 +0x202 Previous read at 0x00c00032ef20 by goroutine 24: github.com/nspcc-dev/neo-go/internal/fakechain.(*FakeChain).GetConfig() /go/src/github.com/nspcc-dev/neo-go/internal/fakechain/fakechain.go:167 +0x6f github.com/nspcc-dev/neo-go/pkg/network.(*Server).initStaleMemPools() /go/src/github.com/nspcc-dev/neo-go/pkg/network/server.go:1433 +0x89 github.com/nspcc-dev/neo-go/pkg/network.(*Server).Start() /go/src/github.com/nspcc-dev/neo-go/pkg/network/server.go:284 +0x288 github.com/nspcc-dev/neo-go/pkg/network.startWithChannel.func1() /go/src/github.com/nspcc-dev/neo-go/pkg/network/server_test.go:91 +0x44 Goroutine 26 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:1238 +0x5d7 github.com/nspcc-dev/neo-go/pkg/network.TestHandleGetMPTData() /go/src/github.com/nspcc-dev/neo-go/pkg/network/server_test.go:752 +0x8c testing.tRunner() /usr/local/go/src/testing/testing.go:1193 +0x202 Goroutine 24 (running) created at: github.com/nspcc-dev/neo-go/pkg/network.startWithChannel() /go/src/github.com/nspcc-dev/neo-go/pkg/network/server_test.go:90 +0x78 github.com/nspcc-dev/neo-go/pkg/network.startTestServer() /go/src/github.com/nspcc-dev/neo-go/pkg/network/server_test.go:384 +0xbd github.com/nspcc-dev/neo-go/pkg/network.TestHandleGetMPTData.func2() /go/src/github.com/nspcc-dev/neo-go/pkg/network/server_test.go:753 +0x55 testing.tRunner() /usr/local/go/src/testing/testing.go:1193 +0x202 ```
2021-09-13 08:41:54 +00:00
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network/capability"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/stretchr/testify/require"
2019-12-30 07:43:05 +00:00
"go.uber.org/zap/zaptest"
)
type testDiscovery struct {
sync.Mutex
bad []string
connected []string
unregistered []string
backfill []string
}
func newTestDiscovery([]string, time.Duration, Transporter) Discoverer { return new(testDiscovery) }
func (d *testDiscovery) BackFill(addrs ...string) {
d.Lock()
defer d.Unlock()
d.backfill = append(d.backfill, addrs...)
}
func (d *testDiscovery) PoolCount() int { return 0 }
func (d *testDiscovery) RegisterBadAddr(addr string) {
d.Lock()
defer d.Unlock()
d.bad = append(d.bad, addr)
}
func (d *testDiscovery) GetFanOut() int {
d.Lock()
defer d.Unlock()
return (len(d.connected) + len(d.backfill)) * 2 / 3
}
func (d *testDiscovery) NetworkSize() int {
d.Lock()
defer d.Unlock()
return len(d.connected) + len(d.backfill)
}
func (d *testDiscovery) RegisterGoodAddr(string, capability.Capabilities) {}
func (d *testDiscovery) RegisterConnectedAddr(addr string) {
d.Lock()
defer d.Unlock()
d.connected = append(d.connected, addr)
}
func (d *testDiscovery) UnregisterConnectedAddr(addr string) {
d.Lock()
defer d.Unlock()
d.unregistered = append(d.unregistered, addr)
}
func (d *testDiscovery) UnconnectedPeers() []string {
d.Lock()
defer d.Unlock()
return d.unregistered
}
func (d *testDiscovery) RequestRemote(n int) {}
func (d *testDiscovery) BadPeers() []string {
d.Lock()
defer d.Unlock()
return d.bad
}
func (d *testDiscovery) GoodPeers() []AddressWithCapabilities { return []AddressWithCapabilities{} }
var defaultMessageHandler = func(t *testing.T, msg *Message) {}
type localPeer struct {
netaddr net.TCPAddr
server *Server
version *payload.Version
lastBlockIndex uint32
handshaked int32 // TODO: use atomic.Bool after #2626.
isFullNode bool
t *testing.T
messageHandler func(t *testing.T, msg *Message)
pingSent int
getAddrSent int
droppedWith atomic.Value
}
func newLocalPeer(t *testing.T, s *Server) *localPeer {
naddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
return &localPeer{
t: t,
server: s,
netaddr: *naddr,
messageHandler: defaultMessageHandler,
}
}
func (p *localPeer) RemoteAddr() net.Addr {
return &p.netaddr
}
func (p *localPeer) PeerAddr() net.Addr {
return &p.netaddr
}
func (p *localPeer) StartProtocol() {}
func (p *localPeer) Disconnect(err error) {
if p.droppedWith.Load() == nil {
p.droppedWith.Store(err)
}
fmt.Println("peer dropped:", err)
p.server.unregister <- peerDrop{p, err}
}
network: rework broadcast logic We have a number of queues for different purposes: * regular broadcast queue * direct p2p queue * high-priority queue And two basic egress scenarios: * direct p2p messages (replies to requests in Server's handle* methods) * broadcasted messages Low priority broadcasted messages: * transaction inventories * block inventories * notary inventories * non-consensus extensibles High-priority broadcasted messages: * consensus extensibles * getdata transaction requests from consensus process * getaddr requests P2P messages are a bit more complicated, most of the time they use p2p queue, but extensible message requests/replies use HP queue. Server's handle* code is run from Peer's handleIncoming, every peer has this thread that handles incoming messages. When working with the peer it's important to reply to requests and blocking this thread until we send (queue) a reply is fine, if the peer is slow we just won't get anything new from it. The queue used is irrelevant wrt this issue. Broadcasted messages are radically different, we want them to be delivered to many peers, but we don't care about specific ones. If it's delivered to 2/3 of the peers we're fine, if it's delivered to more of them --- it's not an issue. But doing this fairly is not an easy thing, current code tries performing unblocked sends and if this doesn't yield enough results it then blocks (but has a timeout, we can't wait indefinitely). But it does so in sequential manner, once the peer is chosen the code will wait for it (and only it) until timeout happens. What can be done instead is an attempt to push the message to all of the peers simultaneously (or close to that). If they all deliver --- OK, if some block and wait then we can wait until _any_ of them pushes the message through (or global timeout happens, we still can't wait forever). If we have enough deliveries then we can cancel pending ones and it's again not an error if these canceled threads still do their job. This makes the system more dynamic and adds some substantial processing overhead, but it's a networking code, any of this overhead is much lower than the actual packet delivery time. It also allows to spread the load more fairly, if there is any spare queue it'll get the packet and release the broadcaster. On the next broadcast iteration another peer is more likely to be chosen just because it didn't get a message previously (and had some time to deliver already queued messages). It works perfectly in tests, with optimal networking conditions we have much better block times and TPS increases by 5-25%% depending on the scenario. I'd go as far as to say that it fixes the original problem of #2678, because in this particular scenario we have empty queues in ~100% of the cases and this new logic will likely lead to 100% fan out in this case (cancelation just won't happen fast enough). But when the load grows and there is some waiting in the queue it will optimize out the slowest links.
2022-10-10 19:48:06 +00:00
func (p *localPeer) BroadcastPacket(_ context.Context, m []byte) error {
if len(m) == 0 {
return errors.New("empty msg")
}
msg := &Message{}
r := io.NewBinReaderFromBuf(m)
for r.Len() > 0 {
err := msg.Decode(r)
if err == nil {
p.messageHandler(p.t, msg)
}
}
return nil
}
func (p *localPeer) EnqueueP2PMessage(msg *Message) error {
return p.EnqueueHPMessage(msg)
}
func (p *localPeer) EnqueueP2PPacket(m []byte) error {
return p.BroadcastPacket(context.TODO(), m)
}
func (p *localPeer) BroadcastHPPacket(ctx context.Context, m []byte) error {
return p.BroadcastPacket(ctx, m)
}
func (p *localPeer) EnqueueHPMessage(msg *Message) error {
p.messageHandler(p.t, msg)
return nil
}
func (p *localPeer) EnqueueHPPacket(m []byte) error {
return p.BroadcastPacket(context.TODO(), m)
}
func (p *localPeer) Version() *payload.Version {
return p.version
}
func (p *localPeer) LastBlockIndex() uint32 {
return p.lastBlockIndex
}
func (p *localPeer) HandleVersion(v *payload.Version) error {
p.version = v
return nil
}
func (p *localPeer) SendVersion() error {
m, err := p.server.getVersionMsg()
if err != nil {
return err
}
_ = p.EnqueueHPMessage(m)
return nil
}
func (p *localPeer) SendVersionAck(m *Message) error {
_ = p.EnqueueHPMessage(m)
return nil
}
func (p *localPeer) HandleVersionAck() error {
atomic.StoreInt32(&p.handshaked, 1)
return nil
}
func (p *localPeer) SetPingTimer() {
p.pingSent++
}
func (p *localPeer) HandlePing(ping *payload.Ping) error {
p.lastBlockIndex = ping.LastBlockIndex
return nil
}
func (p *localPeer) HandlePong(pong *payload.Ping) error {
p.lastBlockIndex = pong.LastBlockIndex
p.pingSent--
return nil
}
func (p *localPeer) Handshaked() bool {
return atomic.LoadInt32(&p.handshaked) != 0
}
func (p *localPeer) IsFullNode() bool {
return p.isFullNode
}
func (p *localPeer) AddGetAddrSent() {
p.getAddrSent++
}
func (p *localPeer) CanProcessAddr() bool {
p.getAddrSent--
return p.getAddrSent >= 0
}
func newTestServer(t *testing.T, serverConfig ServerConfig) *Server {
network: fix race in TestHandleGetMPTData Init server config before server start. Fixes the following data race: ``` WARNING: DATA RACE Write at 0x00c00032ef20 by goroutine 26: github.com/nspcc-dev/neo-go/pkg/network.TestHandleGetMPTData.func2() /go/src/github.com/nspcc-dev/neo-go/pkg/network/server_test.go:755 +0x10a testing.tRunner() /usr/local/go/src/testing/testing.go:1193 +0x202 Previous read at 0x00c00032ef20 by goroutine 24: github.com/nspcc-dev/neo-go/internal/fakechain.(*FakeChain).GetConfig() /go/src/github.com/nspcc-dev/neo-go/internal/fakechain/fakechain.go:167 +0x6f github.com/nspcc-dev/neo-go/pkg/network.(*Server).initStaleMemPools() /go/src/github.com/nspcc-dev/neo-go/pkg/network/server.go:1433 +0x89 github.com/nspcc-dev/neo-go/pkg/network.(*Server).Start() /go/src/github.com/nspcc-dev/neo-go/pkg/network/server.go:284 +0x288 github.com/nspcc-dev/neo-go/pkg/network.startWithChannel.func1() /go/src/github.com/nspcc-dev/neo-go/pkg/network/server_test.go:91 +0x44 Goroutine 26 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:1238 +0x5d7 github.com/nspcc-dev/neo-go/pkg/network.TestHandleGetMPTData() /go/src/github.com/nspcc-dev/neo-go/pkg/network/server_test.go:752 +0x8c testing.tRunner() /usr/local/go/src/testing/testing.go:1193 +0x202 Goroutine 24 (running) created at: github.com/nspcc-dev/neo-go/pkg/network.startWithChannel() /go/src/github.com/nspcc-dev/neo-go/pkg/network/server_test.go:90 +0x78 github.com/nspcc-dev/neo-go/pkg/network.startTestServer() /go/src/github.com/nspcc-dev/neo-go/pkg/network/server_test.go:384 +0xbd github.com/nspcc-dev/neo-go/pkg/network.TestHandleGetMPTData.func2() /go/src/github.com/nspcc-dev/neo-go/pkg/network/server_test.go:753 +0x55 testing.tRunner() /usr/local/go/src/testing/testing.go:1193 +0x202 ```
2021-09-13 08:41:54 +00:00
return newTestServerWithCustomCfg(t, serverConfig, nil)
}
func newTestServerWithCustomCfg(t *testing.T, serverConfig ServerConfig, protocolCfg func(*config.ProtocolConfiguration)) *Server {
s, err := newServerFromConstructors(serverConfig, fakechain.NewFakeChainWithCustomCfg(protocolCfg), new(fakechain.FakeStateSync), zaptest.NewLogger(t),
2022-01-12 20:04:07 +00:00
newFakeTransp, newTestDiscovery)
require.NoError(t, err)
return s
}