network/services: unify service lifecycle management

Run with Start, Stop with Shutdown, make behavior uniform.
This commit is contained in:
Roman Khimov 2022-01-12 04:11:21 +03:00
parent c942402957
commit 5dd4db2c02
11 changed files with 65 additions and 60 deletions

View file

@ -129,10 +129,6 @@ func NewService(cfg Config) (Service, error) {
finished: make(chan struct{}),
}
if cfg.Wallet == nil {
return srv, nil
}
var err error
if srv.wallet, err = wallet.NewWalletFromFile(cfg.Wallet.Path); err != nil {

View file

@ -16,8 +16,8 @@ type Oracle interface {
UpdateOracleNodes(keys.PublicKeys)
// UpdateNativeContract updates oracle contract native script and hash.
UpdateNativeContract([]byte, []byte, util.Uint160, int)
// Run runs oracle module. Must be invoked in a separate goroutine.
Run()
// Start runs oracle module.
Start()
// Shutdown shutdowns oracle module.
Shutdown()
}

View file

@ -140,9 +140,9 @@ func TestNotary(t *testing.T) {
})
mp1.RunSubscriptions()
go ntr1.Run()
ntr1.Start()
t.Cleanup(func() {
ntr1.Stop()
ntr1.Shutdown()
mp1.StopSubscriptions()
})

View file

@ -305,7 +305,7 @@ func TestOracleFull(t *testing.T) {
require.NoError(t, bc.contracts.Management.PutContractState(bc.dao, cs))
go bc.Run()
go orc.Run()
orc.Start()
t.Cleanup(orc.Shutdown)
bc.setNodesByRole(t, true, noderoles.Oracle, keys.PublicKeys{acc.PrivateKey().PublicKey()})
@ -351,7 +351,7 @@ func TestNotYetRunningOracle(t *testing.T) {
ids = []uint64{3}
orc.RemoveRequests(ids) // 3 removed from pending -> 2, 4 in pending.
go orc.Run()
orc.Start()
t.Cleanup(orc.Shutdown)
require.Eventually(t, func() bool { return mp.Count() == 2 },

View file

@ -199,7 +199,7 @@ func TestStateRootFull(t *testing.T) {
lastValidated.Store(ep)
})
require.NoError(t, err)
srv.Run()
srv.Start()
t.Cleanup(srv.Shutdown)
bc.setNodesByRole(t, true, noderoles.StateValidator, pubs)

View file

@ -52,6 +52,12 @@ var (
)
type (
// Service is a service abstraction (oracle, state root, consensus, etc).
Service interface {
Start()
Shutdown()
}
// Server represents the local Node in the network. Its transport could
// be of any kind.
Server struct {
@ -77,6 +83,7 @@ type (
extensiblePool *extpool.Pool
notaryFeer NotaryFeer
notaryModule *notary.Notary
services []Service
txInLock sync.Mutex
txInMap map[util.Uint256]struct{}
@ -179,6 +186,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
return nil, fmt.Errorf("failed to create Notary module: %w", err)
}
s.notaryModule = n
s.services = append(s.services, n)
chain.SetNotary(n)
}
} else if config.P2PNotaryCfg.Enabled {
@ -197,6 +205,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
return nil, fmt.Errorf("can't initialize StateRoot service: %w", err)
}
s.stateRoot = sr
s.services = append(s.services, sr)
sSync := chain.GetStateSyncModule()
s.stateSync = sSync
@ -221,9 +230,11 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
}
})
s.oracle = orc
s.services = append(s.services, orc)
chain.SetOracle(orc)
}
if config.Wallet != nil {
srv, err := newConsensus(consensus.Config{
Logger: log,
Broadcast: s.handleNewPayload,
@ -239,6 +250,8 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
}
s.consensus = srv
s.services = append(s.services, srv)
}
if s.MinPeers < 0 {
s.log.Info("bad MinPeers configured, using the default value",
@ -299,20 +312,13 @@ func (s *Server) Shutdown() {
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
s.transport.Close()
s.discovery.Close()
s.consensus.Shutdown()
for _, p := range s.getPeers(nil) {
p.Disconnect(errServerShutdown)
}
s.bQueue.discard()
s.bSyncQueue.discard()
if s.StateRootCfg.Enabled {
s.stateRoot.Shutdown()
}
if s.oracle != nil {
s.oracle.Shutdown()
}
if s.notaryModule != nil {
s.notaryModule.Stop()
for _, svc := range s.services {
svc.Shutdown()
}
if s.chain.P2PSigExtensionsEnabled() {
s.notaryRequestPool.StopSubscriptions()
@ -460,20 +466,11 @@ func (s *Server) tryStartServices() {
if s.IsInSync() && s.syncReached.CAS(false, true) {
s.log.Info("node reached synchronized state, starting services")
if s.Wallet != nil {
s.consensus.Start()
}
if s.StateRootCfg.Enabled {
s.stateRoot.Run()
}
if s.oracle != nil {
go s.oracle.Run()
}
if s.chain.P2PSigExtensionsEnabled() {
s.notaryRequestPool.RunSubscriptions() // WSClient is also a subscriber.
}
if s.notaryModule != nil {
go s.notaryModule.Run()
for _, svc := range s.services {
svc.Start()
}
}
}
@ -976,7 +973,9 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error {
}
switch e.Category {
case consensus.Category:
if s.consensus != nil {
s.consensus.OnPayload(e)
}
case stateroot.Category:
err := s.stateRoot.OnPayload(e)
if err != nil {
@ -1009,7 +1008,9 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
s.txInMap[tx.Hash()] = struct{}{}
s.txInLock.Unlock()
if s.verifyAndPoolTX(tx) == nil {
if s.consensus != nil {
s.consensus.OnTransaction(tx)
}
s.broadcastTX(tx, nil)
}
s.txInLock.Lock()

View file

@ -78,7 +78,7 @@ func TestNewServer(t *testing.T) {
})
t.Run("consensus error is not dropped", func(t *testing.T) {
errConsensus := errors.New("can't create consensus")
_, err = newServerFromConstructors(ServerConfig{MinPeers: -1}, bc, zaptest.NewLogger(t), newFakeTransp,
_, err = newServerFromConstructors(ServerConfig{Wallet: new(config.Wallet), MinPeers: -1}, bc, zaptest.NewLogger(t), newFakeTransp,
func(consensus.Config) (consensus.Service, error) { return nil, errConsensus },
newTestDiscovery)
require.True(t, errors.Is(err, errConsensus), "got: %#v", err)
@ -104,13 +104,12 @@ func TestServerStartAndShutdown(t *testing.T) {
require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10)
assert.True(t, s.transport.(*fakeTransp).started.Load())
assert.False(t, s.consensus.(*fakeConsensus).started.Load())
assert.Nil(t, s.consensus)
s.Shutdown()
<-ch
require.True(t, s.transport.(*fakeTransp).closed.Load())
require.True(t, s.consensus.(*fakeConsensus).stopped.Load())
err, ok := p.droppedWith.Load().(error)
require.True(t, ok)
require.True(t, errors.Is(err, errServerShutdown))
@ -416,7 +415,8 @@ func TestBlock(t *testing.T) {
}
func TestConsensus(t *testing.T) {
s := startTestServer(t)
s := newTestServer(t, ServerConfig{Wallet: new(config.Wallet)})
startWithCleanup(t, s)
atomic2.StoreUint32(&s.chain.(*fakechain.FakeChain).Blockheight, 4)
p := newLocalPeer(t, s)
@ -465,7 +465,8 @@ func TestConsensus(t *testing.T) {
}
func TestTransaction(t *testing.T) {
s := startTestServer(t)
s := newTestServer(t, ServerConfig{Wallet: new(config.Wallet)})
startWithCleanup(t, s)
t.Run("good", func(t *testing.T) {
tx := newDummyTx()

View file

@ -143,12 +143,16 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu
}, nil
}
// Run runs Notary module and should be called in a separate goroutine.
func (n *Notary) Run() {
// Start runs Notary module in a separate goroutine.
func (n *Notary) Start() {
n.Config.Log.Info("starting notary service")
n.Config.Chain.SubscribeForBlocks(n.blocksCh)
n.mp.SubscribeForTransactions(n.reqCh)
go n.newTxCallbackLoop()
go n.mainLoop()
}
func (n *Notary) mainLoop() {
for {
select {
case <-n.stopCh:
@ -171,8 +175,8 @@ func (n *Notary) Run() {
}
}
// Stop shutdowns Notary module.
func (n *Notary) Stop() {
// Shutdown stops Notary module.
func (n *Notary) Shutdown() {
close(n.stopCh)
}

View file

@ -170,15 +170,18 @@ func (o *Oracle) Shutdown() {
o.getBroadcaster().Shutdown()
}
// Run runs must be executed in a separate goroutine.
func (o *Oracle) Run() {
// Start runs the oracle service in a separate goroutine.
func (o *Oracle) Start() {
o.respMtx.Lock()
if o.running {
o.respMtx.Unlock()
return
}
o.Log.Info("starting oracle service")
go o.start()
}
func (o *Oracle) start() {
o.requestMap <- o.pending // Guaranteed to not block, only AddRequests sends to it.
o.pending = nil
o.running = true

View file

@ -25,7 +25,7 @@ type (
OnPayload(p *payload.Extensible) error
AddSignature(height uint32, validatorIndex int32, sig []byte) error
GetConfig() config.StateRoot
Run()
Start()
Shutdown()
}

View file

@ -17,8 +17,8 @@ const (
firstVoteResendDelay = 3 * time.Second
)
// Run runs service instance in a separate goroutine.
func (s *service) Run() {
// Start runs service instance in a separate goroutine.
func (s *service) Start() {
s.log.Info("starting state validation service")
s.chain.SubscribeForBlocks(s.blockCh)
go s.run()