network: drop consensus dependency

This commit is contained in:
Roman Khimov 2022-01-12 23:04:07 +03:00
parent 66aafd868b
commit 508d36f698
5 changed files with 71 additions and 51 deletions

View file

@ -12,6 +12,7 @@ import (
"github.com/nspcc-dev/neo-go/cli/input" "github.com/nspcc-dev/neo-go/cli/input"
"github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/consensus"
"github.com/nspcc-dev/neo-go/pkg/core" "github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
@ -84,6 +85,17 @@ func newTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch
serverConfig := network.NewServerConfig(cfg) serverConfig := network.NewServerConfig(cfg)
netSrv, err := network.NewServer(serverConfig, chain, zap.NewNop()) netSrv, err := network.NewServer(serverConfig, chain, zap.NewNop())
require.NoError(t, err) require.NoError(t, err)
cons, err := consensus.NewService(consensus.Config{
Logger: zap.NewNop(),
Broadcast: netSrv.BroadcastExtensible,
Chain: chain,
ProtocolConfiguration: chain.GetConfig(),
RequestTx: netSrv.RequestTx,
Wallet: serverConfig.Wallet,
TimePerBlock: serverConfig.TimePerBlock,
})
require.NoError(t, err)
netSrv.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction)
go netSrv.Start(make(chan error, 1)) go netSrv.Start(make(chan error, 1))
rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger) rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger)
errCh := make(chan error, 2) errCh := make(chan error, 2)

View file

@ -9,6 +9,7 @@ import (
"github.com/nspcc-dev/neo-go/cli/options" "github.com/nspcc-dev/neo-go/cli/options"
"github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/consensus"
"github.com/nspcc-dev/neo-go/pkg/core" "github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/chaindump" "github.com/nspcc-dev/neo-go/pkg/core/chaindump"
@ -338,6 +339,27 @@ func mkOracle(config network.ServerConfig, chain *core.Blockchain, serv *network
return orc, nil return orc, nil
} }
func mkConsensus(config network.ServerConfig, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (consensus.Service, error) {
if config.Wallet == nil {
return nil, nil
}
srv, err := consensus.NewService(consensus.Config{
Logger: log,
Broadcast: serv.BroadcastExtensible,
Chain: chain,
ProtocolConfiguration: chain.GetConfig(),
RequestTx: serv.RequestTx,
Wallet: config.Wallet,
TimePerBlock: config.TimePerBlock,
})
if err != nil {
return nil, fmt.Errorf("can't initialize Consensus module: %w", err)
}
serv.AddExtensibleHPService(srv, consensus.Category, srv.OnPayload, srv.OnTransaction)
return srv, nil
}
func startServer(ctx *cli.Context) error { func startServer(ctx *cli.Context) error {
cfg, err := getConfigFromContext(ctx) cfg, err := getConfigFromContext(ctx)
if err != nil { if err != nil {
@ -372,6 +394,10 @@ func startServer(ctx *cli.Context) error {
if err != nil { if err != nil {
return err return err
} }
_, err = mkConsensus(serverConfig, chain, serv, log)
if err != nil {
return err
}
rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log) rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log)
errChan := make(chan error) errChan := make(chan error)

View file

@ -10,6 +10,7 @@ import (
"github.com/nspcc-dev/neo-go/internal/fakechain" "github.com/nspcc-dev/neo-go/internal/fakechain"
"github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/consensus"
"github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/capability"
"github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/network/payload"
@ -193,8 +194,12 @@ func newTestServer(t *testing.T, serverConfig ServerConfig) *Server {
func newTestServerWithCustomCfg(t *testing.T, serverConfig ServerConfig, protocolCfg func(*config.ProtocolConfiguration)) *Server { func newTestServerWithCustomCfg(t *testing.T, serverConfig ServerConfig, protocolCfg func(*config.ProtocolConfiguration)) *Server {
s, err := newServerFromConstructors(serverConfig, fakechain.NewFakeChainWithCustomCfg(protocolCfg), zaptest.NewLogger(t), s, err := newServerFromConstructors(serverConfig, fakechain.NewFakeChainWithCustomCfg(protocolCfg), zaptest.NewLogger(t),
newFakeTransp, newFakeConsensus, newTestDiscovery) newFakeTransp, newTestDiscovery)
require.NoError(t, err) require.NoError(t, err)
if serverConfig.Wallet != nil {
cons := new(fakeConsensus)
s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction)
}
t.Cleanup(s.discovery.Close) t.Cleanup(s.discovery.Close)
return s return s
} }

View file

@ -13,7 +13,6 @@ import (
"time" "time"
"github.com/nspcc-dev/neo-go/pkg/config/netmode" "github.com/nspcc-dev/neo-go/pkg/config/netmode"
"github.com/nspcc-dev/neo-go/pkg/consensus"
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
"github.com/nspcc-dev/neo-go/pkg/core/mempool" "github.com/nspcc-dev/neo-go/pkg/core/mempool"
@ -75,7 +74,6 @@ type (
chain blockchainer.Blockchainer chain blockchainer.Blockchainer
bQueue *blockQueue bQueue *blockQueue
bSyncQueue *blockQueue bSyncQueue *blockQueue
consensus consensus.Service
mempool *mempool.Pool mempool *mempool.Pool
notaryRequestPool *mempool.Pool notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool extensiblePool *extpool.Pool
@ -83,6 +81,8 @@ type (
notaryModule *notary.Notary notaryModule *notary.Notary
services []Service services []Service
extensHandlers map[string]func(*payload.Extensible) error extensHandlers map[string]func(*payload.Extensible) error
extensHighPrio string
txCallback func(*transaction.Transaction)
txInLock sync.Mutex txInLock sync.Mutex
txInMap map[util.Uint256]struct{} txInMap map[util.Uint256]struct{}
@ -124,12 +124,11 @@ func randomID() uint32 {
func NewServer(config ServerConfig, chain blockchainer.Blockchainer, log *zap.Logger) (*Server, error) { func NewServer(config ServerConfig, chain blockchainer.Blockchainer, log *zap.Logger) (*Server, error) {
return newServerFromConstructors(config, chain, log, func(s *Server) Transporter { return newServerFromConstructors(config, chain, log, func(s *Server) Transporter {
return NewTCPTransport(s, net.JoinHostPort(s.ServerConfig.Address, strconv.Itoa(int(s.ServerConfig.Port))), s.log) return NewTCPTransport(s, net.JoinHostPort(s.ServerConfig.Address, strconv.Itoa(int(s.ServerConfig.Port))), s.log)
}, consensus.NewService, newDefaultDiscovery) }, newDefaultDiscovery)
} }
func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchainer, log *zap.Logger, func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchainer, log *zap.Logger,
newTransport func(*Server) Transporter, newTransport func(*Server) Transporter,
newConsensus func(consensus.Config) (consensus.Service, error),
newDiscovery func([]string, time.Duration, Transporter) Discoverer, newDiscovery func([]string, time.Duration, Transporter) Discoverer,
) (*Server, error) { ) (*Server, error) {
if log == nil { if log == nil {
@ -198,25 +197,6 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
s.stateSync = sSync s.stateSync = sSync
s.bSyncQueue = newBlockQueue(maxBlockBatch, sSync, log, nil) s.bSyncQueue = newBlockQueue(maxBlockBatch, sSync, log, nil)
if config.Wallet != nil {
srv, err := newConsensus(consensus.Config{
Logger: log,
Broadcast: s.BroadcastExtensible,
Chain: chain,
ProtocolConfiguration: chain.GetConfig(),
RequestTx: s.requestTx,
Wallet: config.Wallet,
TimePerBlock: config.TimePerBlock,
})
if err != nil {
return nil, err
}
s.consensus = srv
s.AddExtensibleService(srv, consensus.Category, srv.OnPayload)
}
if s.MinPeers < 0 { if s.MinPeers < 0 {
s.log.Info("bad MinPeers configured, using the default value", s.log.Info("bad MinPeers configured, using the default value",
zap.Int("configured", s.MinPeers), zap.Int("configured", s.MinPeers),
@ -301,6 +281,13 @@ func (s *Server) AddExtensibleService(svc Service, category string, handler func
s.AddService(svc) s.AddService(svc)
} }
// AddExtensibleHPService registers a high-priority service that handles extensible payload of some kind.
func (s *Server) AddExtensibleHPService(svc Service, category string, handler func(*payload.Extensible) error, txCallback func(*transaction.Transaction)) {
s.txCallback = txCallback
s.extensHighPrio = category
s.AddExtensibleService(svc, category, handler)
}
// UnconnectedPeers returns a list of peers that are in the discovery peer list // UnconnectedPeers returns a list of peers that are in the discovery peer list
// but are not connected to the server. // but are not connected to the server.
func (s *Server) UnconnectedPeers() []string { func (s *Server) UnconnectedPeers() []string {
@ -949,7 +936,7 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error {
func (s *Server) advertiseExtensible(e *payload.Extensible) { func (s *Server) advertiseExtensible(e *payload.Extensible) {
msg := NewMessage(CMDInv, payload.NewInventory(payload.ExtensibleType, []util.Uint256{e.Hash()})) msg := NewMessage(CMDInv, payload.NewInventory(payload.ExtensibleType, []util.Uint256{e.Hash()}))
if e.Category == consensus.Category { if e.Category == s.extensHighPrio {
// It's high priority because it directly affects consensus process, // It's high priority because it directly affects consensus process,
// even though it's just an inv. // even though it's just an inv.
s.broadcastHPMessage(msg) s.broadcastHPMessage(msg)
@ -972,8 +959,8 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
s.txInMap[tx.Hash()] = struct{}{} s.txInMap[tx.Hash()] = struct{}{}
s.txInLock.Unlock() s.txInLock.Unlock()
if s.verifyAndPoolTX(tx) == nil { if s.verifyAndPoolTX(tx) == nil {
if s.consensus != nil { if s.txCallback != nil {
s.consensus.OnTransaction(tx) s.txCallback(tx)
} }
s.broadcastTX(tx, nil) s.broadcastTX(tx, nil)
} }
@ -1255,7 +1242,8 @@ func (s *Server) BroadcastExtensible(p *payload.Extensible) {
s.advertiseExtensible(p) s.advertiseExtensible(p)
} }
func (s *Server) requestTx(hashes ...util.Uint256) { // RequestTx asks for given transactions from Server peers using GetData message.
func (s *Server) RequestTx(hashes ...util.Uint256) {
if len(hashes) == 0 { if len(hashes) == 0 {
return return
} }

View file

@ -26,7 +26,6 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/atomic" "go.uber.org/atomic"
"go.uber.org/zap/zaptest"
) )
type fakeConsensus struct { type fakeConsensus struct {
@ -38,9 +37,6 @@ type fakeConsensus struct {
var _ consensus.Service = (*fakeConsensus)(nil) var _ consensus.Service = (*fakeConsensus)(nil)
func newFakeConsensus(c consensus.Config) (consensus.Service, error) {
return new(fakeConsensus), nil
}
func (f *fakeConsensus) Start() { f.started.Store(true) } func (f *fakeConsensus) Start() { f.started.Store(true) }
func (f *fakeConsensus) Shutdown() { f.stopped.Store(true) } func (f *fakeConsensus) Shutdown() { f.stopped.Store(true) }
func (f *fakeConsensus) OnPayload(p *payload.Extensible) error { func (f *fakeConsensus) OnPayload(p *payload.Extensible) error {
@ -55,7 +51,7 @@ func TestNewServer(t *testing.T) {
P2PStateExchangeExtensions: true, P2PStateExchangeExtensions: true,
StateRootInHeader: true, StateRootInHeader: true,
}} }}
s, err := newServerFromConstructors(ServerConfig{}, bc, nil, newFakeTransp, newFakeConsensus, newTestDiscovery) s, err := newServerFromConstructors(ServerConfig{}, bc, nil, newFakeTransp, newTestDiscovery)
require.Error(t, err) require.Error(t, err)
t.Run("set defaults", func(t *testing.T) { t.Run("set defaults", func(t *testing.T) {
@ -79,13 +75,6 @@ func TestNewServer(t *testing.T) {
require.Equal(t, 2, s.ServerConfig.MaxPeers) require.Equal(t, 2, s.ServerConfig.MaxPeers)
require.Equal(t, 3, s.ServerConfig.AttemptConnPeers) require.Equal(t, 3, s.ServerConfig.AttemptConnPeers)
}) })
t.Run("consensus error is not dropped", func(t *testing.T) {
errConsensus := errors.New("can't create consensus")
_, err = newServerFromConstructors(ServerConfig{Wallet: new(config.Wallet), MinPeers: -1}, bc, zaptest.NewLogger(t), newFakeTransp,
func(consensus.Config) (consensus.Service, error) { return nil, errConsensus },
newTestDiscovery)
require.True(t, errors.Is(err, errConsensus), "got: %#v", err)
})
} }
func startWithChannel(s *Server) chan error { func startWithChannel(s *Server) chan error {
@ -107,7 +96,7 @@ func TestServerStartAndShutdown(t *testing.T) {
require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10) require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10)
assert.True(t, s.transport.(*fakeTransp).started.Load()) assert.True(t, s.transport.(*fakeTransp).started.Load())
assert.Nil(t, s.consensus) assert.Nil(t, s.txCallback)
s.Shutdown() s.Shutdown()
<-ch <-ch
@ -124,12 +113,12 @@ func TestServerStartAndShutdown(t *testing.T) {
p := newLocalPeer(t, s) p := newLocalPeer(t, s)
s.register <- p s.register <- p
assert.True(t, s.consensus.(*fakeConsensus).started.Load()) assert.True(t, s.services[0].(*fakeConsensus).started.Load())
s.Shutdown() s.Shutdown()
<-ch <-ch
require.True(t, s.consensus.(*fakeConsensus).stopped.Load()) require.True(t, s.services[0].(*fakeConsensus).stopped.Load())
}) })
} }
@ -441,13 +430,13 @@ func TestConsensus(t *testing.T) {
s.chain.(*fakechain.FakeChain).VerifyWitnessF = func() (int64, error) { return 0, nil } s.chain.(*fakechain.FakeChain).VerifyWitnessF = func() (int64, error) { return 0, nil }
require.NoError(t, s.handleMessage(p, msg)) require.NoError(t, s.handleMessage(p, msg))
require.Contains(t, s.consensus.(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) require.Contains(t, s.services[0].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible))
t.Run("small ValidUntilBlockEnd", func(t *testing.T) { t.Run("small ValidUntilBlockEnd", func(t *testing.T) {
t.Run("current height", func(t *testing.T) { t.Run("current height", func(t *testing.T) {
msg := newConsensusMessage(0, s.chain.BlockHeight()) msg := newConsensusMessage(0, s.chain.BlockHeight())
require.NoError(t, s.handleMessage(p, msg)) require.NoError(t, s.handleMessage(p, msg))
require.NotContains(t, s.consensus.(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) require.NotContains(t, s.services[0].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible))
}) })
t.Run("invalid", func(t *testing.T) { t.Run("invalid", func(t *testing.T) {
msg := newConsensusMessage(0, s.chain.BlockHeight()-1) msg := newConsensusMessage(0, s.chain.BlockHeight()-1)
@ -478,13 +467,13 @@ func TestTransaction(t *testing.T) {
s.register <- p s.register <- p
s.testHandleMessage(t, nil, CMDTX, tx) s.testHandleMessage(t, nil, CMDTX, tx)
require.Contains(t, s.consensus.(*fakeConsensus).txs, tx) require.Contains(t, s.services[0].(*fakeConsensus).txs, tx)
}) })
t.Run("bad", func(t *testing.T) { t.Run("bad", func(t *testing.T) {
tx := newDummyTx() tx := newDummyTx()
s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds } s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds }
s.testHandleMessage(t, nil, CMDTX, tx) s.testHandleMessage(t, nil, CMDTX, tx)
for _, ftx := range s.consensus.(*fakeConsensus).txs { for _, ftx := range s.services[0].(*fakeConsensus).txs {
require.NotEqual(t, ftx, tx) require.NotEqual(t, ftx, tx)
} }
}) })
@ -904,13 +893,13 @@ func TestRequestTx(t *testing.T) {
t.Run("no hashes, no message", func(t *testing.T) { t.Run("no hashes, no message", func(t *testing.T) {
actual = nil actual = nil
s.requestTx() s.RequestTx()
require.Nil(t, actual) require.Nil(t, actual)
}) })
t.Run("good, small", func(t *testing.T) { t.Run("good, small", func(t *testing.T) {
actual = nil actual = nil
expected := []util.Uint256{random.Uint256(), random.Uint256()} expected := []util.Uint256{random.Uint256(), random.Uint256()}
s.requestTx(expected...) s.RequestTx(expected...)
require.Equal(t, expected, actual) require.Equal(t, expected, actual)
}) })
t.Run("good, exactly one chunk", func(t *testing.T) { t.Run("good, exactly one chunk", func(t *testing.T) {
@ -919,7 +908,7 @@ func TestRequestTx(t *testing.T) {
for i := range expected { for i := range expected {
expected[i] = random.Uint256() expected[i] = random.Uint256()
} }
s.requestTx(expected...) s.RequestTx(expected...)
require.Equal(t, expected, actual) require.Equal(t, expected, actual)
}) })
t.Run("good, multiple chunks", func(t *testing.T) { t.Run("good, multiple chunks", func(t *testing.T) {
@ -928,7 +917,7 @@ func TestRequestTx(t *testing.T) {
for i := range expected { for i := range expected {
expected[i] = random.Uint256() expected[i] = random.Uint256()
} }
s.requestTx(expected...) s.RequestTx(expected...)
require.Equal(t, expected, actual) require.Equal(t, expected, actual)
}) })
} }