From 508d36f6986a83e39477516bcb14eb36a3b2866d Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 12 Jan 2022 23:04:07 +0300 Subject: [PATCH] network: drop consensus dependency --- cli/executor_test.go | 12 +++++++++++ cli/server/server.go | 26 +++++++++++++++++++++++ pkg/network/helper_test.go | 7 ++++++- pkg/network/server.go | 42 ++++++++++++++------------------------ pkg/network/server_test.go | 35 +++++++++++-------------------- 5 files changed, 71 insertions(+), 51 deletions(-) diff --git a/cli/executor_test.go b/cli/executor_test.go index d1e4b1402..91a32ba88 100644 --- a/cli/executor_test.go +++ b/cli/executor_test.go @@ -12,6 +12,7 @@ import ( "github.com/nspcc-dev/neo-go/cli/input" "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/consensus" "github.com/nspcc-dev/neo-go/pkg/core" "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/transaction" @@ -84,6 +85,17 @@ func newTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch serverConfig := network.NewServerConfig(cfg) netSrv, err := network.NewServer(serverConfig, chain, zap.NewNop()) require.NoError(t, err) + cons, err := consensus.NewService(consensus.Config{ + Logger: zap.NewNop(), + Broadcast: netSrv.BroadcastExtensible, + Chain: chain, + ProtocolConfiguration: chain.GetConfig(), + RequestTx: netSrv.RequestTx, + Wallet: serverConfig.Wallet, + TimePerBlock: serverConfig.TimePerBlock, + }) + require.NoError(t, err) + netSrv.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) go netSrv.Start(make(chan error, 1)) rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger) errCh := make(chan error, 2) diff --git a/cli/server/server.go b/cli/server/server.go index e2638a2e8..15a86a2e0 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -9,6 +9,7 @@ import ( "github.com/nspcc-dev/neo-go/cli/options" "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/consensus" "github.com/nspcc-dev/neo-go/pkg/core" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/chaindump" @@ -338,6 +339,27 @@ func mkOracle(config network.ServerConfig, chain *core.Blockchain, serv *network return orc, nil } +func mkConsensus(config network.ServerConfig, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (consensus.Service, error) { + if config.Wallet == nil { + return nil, nil + } + srv, err := consensus.NewService(consensus.Config{ + Logger: log, + Broadcast: serv.BroadcastExtensible, + Chain: chain, + ProtocolConfiguration: chain.GetConfig(), + RequestTx: serv.RequestTx, + Wallet: config.Wallet, + TimePerBlock: config.TimePerBlock, + }) + if err != nil { + return nil, fmt.Errorf("can't initialize Consensus module: %w", err) + } + + serv.AddExtensibleHPService(srv, consensus.Category, srv.OnPayload, srv.OnTransaction) + return srv, nil +} + func startServer(ctx *cli.Context) error { cfg, err := getConfigFromContext(ctx) if err != nil { @@ -372,6 +394,10 @@ func startServer(ctx *cli.Context) error { if err != nil { return err } + _, err = mkConsensus(serverConfig, chain, serv, log) + if err != nil { + return err + } rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log) errChan := make(chan error) diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 23c399c91..9c65cd390 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -10,6 +10,7 @@ import ( "github.com/nspcc-dev/neo-go/internal/fakechain" "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/consensus" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/payload" @@ -193,8 +194,12 @@ func newTestServer(t *testing.T, serverConfig ServerConfig) *Server { func newTestServerWithCustomCfg(t *testing.T, serverConfig ServerConfig, protocolCfg func(*config.ProtocolConfiguration)) *Server { s, err := newServerFromConstructors(serverConfig, fakechain.NewFakeChainWithCustomCfg(protocolCfg), zaptest.NewLogger(t), - newFakeTransp, newFakeConsensus, newTestDiscovery) + newFakeTransp, newTestDiscovery) require.NoError(t, err) + if serverConfig.Wallet != nil { + cons := new(fakeConsensus) + s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) + } t.Cleanup(s.discovery.Close) return s } diff --git a/pkg/network/server.go b/pkg/network/server.go index 732f31f94..f732e53ba 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -13,7 +13,6 @@ import ( "time" "github.com/nspcc-dev/neo-go/pkg/config/netmode" - "github.com/nspcc-dev/neo-go/pkg/consensus" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/mempool" @@ -75,7 +74,6 @@ type ( chain blockchainer.Blockchainer bQueue *blockQueue bSyncQueue *blockQueue - consensus consensus.Service mempool *mempool.Pool notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool @@ -83,6 +81,8 @@ type ( notaryModule *notary.Notary services []Service extensHandlers map[string]func(*payload.Extensible) error + extensHighPrio string + txCallback func(*transaction.Transaction) txInLock sync.Mutex txInMap map[util.Uint256]struct{} @@ -124,12 +124,11 @@ func randomID() uint32 { func NewServer(config ServerConfig, chain blockchainer.Blockchainer, log *zap.Logger) (*Server, error) { return newServerFromConstructors(config, chain, log, func(s *Server) Transporter { return NewTCPTransport(s, net.JoinHostPort(s.ServerConfig.Address, strconv.Itoa(int(s.ServerConfig.Port))), s.log) - }, consensus.NewService, newDefaultDiscovery) + }, newDefaultDiscovery) } func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchainer, log *zap.Logger, newTransport func(*Server) Transporter, - newConsensus func(consensus.Config) (consensus.Service, error), newDiscovery func([]string, time.Duration, Transporter) Discoverer, ) (*Server, error) { if log == nil { @@ -198,25 +197,6 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai s.stateSync = sSync s.bSyncQueue = newBlockQueue(maxBlockBatch, sSync, log, nil) - if config.Wallet != nil { - srv, err := newConsensus(consensus.Config{ - Logger: log, - Broadcast: s.BroadcastExtensible, - Chain: chain, - ProtocolConfiguration: chain.GetConfig(), - RequestTx: s.requestTx, - Wallet: config.Wallet, - - TimePerBlock: config.TimePerBlock, - }) - if err != nil { - return nil, err - } - - s.consensus = srv - s.AddExtensibleService(srv, consensus.Category, srv.OnPayload) - } - if s.MinPeers < 0 { s.log.Info("bad MinPeers configured, using the default value", zap.Int("configured", s.MinPeers), @@ -301,6 +281,13 @@ func (s *Server) AddExtensibleService(svc Service, category string, handler func s.AddService(svc) } +// AddExtensibleHPService registers a high-priority service that handles extensible payload of some kind. +func (s *Server) AddExtensibleHPService(svc Service, category string, handler func(*payload.Extensible) error, txCallback func(*transaction.Transaction)) { + s.txCallback = txCallback + s.extensHighPrio = category + s.AddExtensibleService(svc, category, handler) +} + // UnconnectedPeers returns a list of peers that are in the discovery peer list // but are not connected to the server. func (s *Server) UnconnectedPeers() []string { @@ -949,7 +936,7 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error { func (s *Server) advertiseExtensible(e *payload.Extensible) { msg := NewMessage(CMDInv, payload.NewInventory(payload.ExtensibleType, []util.Uint256{e.Hash()})) - if e.Category == consensus.Category { + if e.Category == s.extensHighPrio { // It's high priority because it directly affects consensus process, // even though it's just an inv. s.broadcastHPMessage(msg) @@ -972,8 +959,8 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error { s.txInMap[tx.Hash()] = struct{}{} s.txInLock.Unlock() if s.verifyAndPoolTX(tx) == nil { - if s.consensus != nil { - s.consensus.OnTransaction(tx) + if s.txCallback != nil { + s.txCallback(tx) } s.broadcastTX(tx, nil) } @@ -1255,7 +1242,8 @@ func (s *Server) BroadcastExtensible(p *payload.Extensible) { s.advertiseExtensible(p) } -func (s *Server) requestTx(hashes ...util.Uint256) { +// RequestTx asks for given transactions from Server peers using GetData message. +func (s *Server) RequestTx(hashes ...util.Uint256) { if len(hashes) == 0 { return } diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index ec3bc9162..613cdac77 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -26,7 +26,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" - "go.uber.org/zap/zaptest" ) type fakeConsensus struct { @@ -38,9 +37,6 @@ type fakeConsensus struct { var _ consensus.Service = (*fakeConsensus)(nil) -func newFakeConsensus(c consensus.Config) (consensus.Service, error) { - return new(fakeConsensus), nil -} func (f *fakeConsensus) Start() { f.started.Store(true) } func (f *fakeConsensus) Shutdown() { f.stopped.Store(true) } func (f *fakeConsensus) OnPayload(p *payload.Extensible) error { @@ -55,7 +51,7 @@ func TestNewServer(t *testing.T) { P2PStateExchangeExtensions: true, StateRootInHeader: true, }} - s, err := newServerFromConstructors(ServerConfig{}, bc, nil, newFakeTransp, newFakeConsensus, newTestDiscovery) + s, err := newServerFromConstructors(ServerConfig{}, bc, nil, newFakeTransp, newTestDiscovery) require.Error(t, err) t.Run("set defaults", func(t *testing.T) { @@ -79,13 +75,6 @@ func TestNewServer(t *testing.T) { require.Equal(t, 2, s.ServerConfig.MaxPeers) require.Equal(t, 3, s.ServerConfig.AttemptConnPeers) }) - t.Run("consensus error is not dropped", func(t *testing.T) { - errConsensus := errors.New("can't create consensus") - _, err = newServerFromConstructors(ServerConfig{Wallet: new(config.Wallet), MinPeers: -1}, bc, zaptest.NewLogger(t), newFakeTransp, - func(consensus.Config) (consensus.Service, error) { return nil, errConsensus }, - newTestDiscovery) - require.True(t, errors.Is(err, errConsensus), "got: %#v", err) - }) } func startWithChannel(s *Server) chan error { @@ -107,7 +96,7 @@ func TestServerStartAndShutdown(t *testing.T) { require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10) assert.True(t, s.transport.(*fakeTransp).started.Load()) - assert.Nil(t, s.consensus) + assert.Nil(t, s.txCallback) s.Shutdown() <-ch @@ -124,12 +113,12 @@ func TestServerStartAndShutdown(t *testing.T) { p := newLocalPeer(t, s) s.register <- p - assert.True(t, s.consensus.(*fakeConsensus).started.Load()) + assert.True(t, s.services[0].(*fakeConsensus).started.Load()) s.Shutdown() <-ch - require.True(t, s.consensus.(*fakeConsensus).stopped.Load()) + require.True(t, s.services[0].(*fakeConsensus).stopped.Load()) }) } @@ -441,13 +430,13 @@ func TestConsensus(t *testing.T) { s.chain.(*fakechain.FakeChain).VerifyWitnessF = func() (int64, error) { return 0, nil } require.NoError(t, s.handleMessage(p, msg)) - require.Contains(t, s.consensus.(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) + require.Contains(t, s.services[0].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) t.Run("small ValidUntilBlockEnd", func(t *testing.T) { t.Run("current height", func(t *testing.T) { msg := newConsensusMessage(0, s.chain.BlockHeight()) require.NoError(t, s.handleMessage(p, msg)) - require.NotContains(t, s.consensus.(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) + require.NotContains(t, s.services[0].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) }) t.Run("invalid", func(t *testing.T) { msg := newConsensusMessage(0, s.chain.BlockHeight()-1) @@ -478,13 +467,13 @@ func TestTransaction(t *testing.T) { s.register <- p s.testHandleMessage(t, nil, CMDTX, tx) - require.Contains(t, s.consensus.(*fakeConsensus).txs, tx) + require.Contains(t, s.services[0].(*fakeConsensus).txs, tx) }) t.Run("bad", func(t *testing.T) { tx := newDummyTx() s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds } s.testHandleMessage(t, nil, CMDTX, tx) - for _, ftx := range s.consensus.(*fakeConsensus).txs { + for _, ftx := range s.services[0].(*fakeConsensus).txs { require.NotEqual(t, ftx, tx) } }) @@ -904,13 +893,13 @@ func TestRequestTx(t *testing.T) { t.Run("no hashes, no message", func(t *testing.T) { actual = nil - s.requestTx() + s.RequestTx() require.Nil(t, actual) }) t.Run("good, small", func(t *testing.T) { actual = nil expected := []util.Uint256{random.Uint256(), random.Uint256()} - s.requestTx(expected...) + s.RequestTx(expected...) require.Equal(t, expected, actual) }) t.Run("good, exactly one chunk", func(t *testing.T) { @@ -919,7 +908,7 @@ func TestRequestTx(t *testing.T) { for i := range expected { expected[i] = random.Uint256() } - s.requestTx(expected...) + s.RequestTx(expected...) require.Equal(t, expected, actual) }) t.Run("good, multiple chunks", func(t *testing.T) { @@ -928,7 +917,7 @@ func TestRequestTx(t *testing.T) { for i := range expected { expected[i] = random.Uint256() } - s.requestTx(expected...) + s.RequestTx(expected...) require.Equal(t, expected, actual) }) }