cli: restart consensus service on USR2

Fix #1949. Also drop wallet from the ServerConfig since it's not used in any
meaningful way after this change.
This commit is contained in:
Roman Khimov 2022-07-26 22:41:52 +03:00
parent bf92966633
commit 5a7fa2d3df
6 changed files with 30 additions and 22 deletions

View file

@ -147,7 +147,7 @@ func newTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch
Chain: chain, Chain: chain,
ProtocolConfiguration: chain.GetConfig(), ProtocolConfiguration: chain.GetConfig(),
RequestTx: netSrv.RequestTx, RequestTx: netSrv.RequestTx,
Wallet: serverConfig.Wallet, Wallet: &cfg.ApplicationConfiguration.UnlockWallet,
TimePerBlock: serverConfig.TimePerBlock, TimePerBlock: serverConfig.TimePerBlock,
}) })
require.NoError(t, err) require.NoError(t, err)

View file

@ -9,6 +9,7 @@ import (
"os/signal" "os/signal"
"runtime" "runtime"
"syscall" "syscall"
"time"
"github.com/nspcc-dev/neo-go/cli/options" "github.com/nspcc-dev/neo-go/cli/options"
"github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config"
@ -406,8 +407,8 @@ func mkOracle(config config.OracleConfiguration, magic netmode.Magic, chain *cor
return orc, nil return orc, nil
} }
func mkConsensus(config network.ServerConfig, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (consensus.Service, error) { func mkConsensus(config config.Wallet, tpb time.Duration, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (consensus.Service, error) {
if config.Wallet == nil { if len(config.Path) == 0 {
return nil, nil return nil, nil
} }
srv, err := consensus.NewService(consensus.Config{ srv, err := consensus.NewService(consensus.Config{
@ -416,8 +417,8 @@ func mkConsensus(config network.ServerConfig, chain *core.Blockchain, serv *netw
Chain: chain, Chain: chain,
ProtocolConfiguration: chain.GetConfig(), ProtocolConfiguration: chain.GetConfig(),
RequestTx: serv.RequestTx, RequestTx: serv.RequestTx,
Wallet: config.Wallet, Wallet: &config,
TimePerBlock: config.TimePerBlock, TimePerBlock: tpb,
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("can't initialize Consensus module: %w", err) return nil, fmt.Errorf("can't initialize Consensus module: %w", err)
@ -497,7 +498,7 @@ func startServer(ctx *cli.Context) error {
if err != nil { if err != nil {
return cli.NewExitError(err, 1) return cli.NewExitError(err, 1)
} }
_, err = mkConsensus(serverConfig, chain, serv, log) dbftSrv, err := mkConsensus(cfg.ApplicationConfiguration.UnlockWallet, serverConfig.TimePerBlock, chain, serv, log)
if err != nil { if err != nil {
return cli.NewExitError(err, 1) return cli.NewExitError(err, 1)
} }
@ -517,6 +518,7 @@ func startServer(ctx *cli.Context) error {
sigCh := make(chan os.Signal, 1) sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGHUP) signal.Notify(sigCh, syscall.SIGHUP)
signal.Notify(sigCh, syscall.SIGUSR1) signal.Notify(sigCh, syscall.SIGUSR1)
signal.Notify(sigCh, syscall.SIGUSR2)
fmt.Fprintln(ctx.App.Writer, Logo()) fmt.Fprintln(ctx.App.Writer, Logo())
fmt.Fprintln(ctx.App.Writer, serv.UserAgent) fmt.Fprintln(ctx.App.Writer, serv.UserAgent)
@ -599,6 +601,18 @@ Main:
if serv.IsInSync() { if serv.IsInSync() {
sr.Start() sr.Start()
} }
case syscall.SIGUSR2:
if dbftSrv != nil {
dbftSrv.Shutdown()
}
dbftSrv, err = mkConsensus(cfgnew.ApplicationConfiguration.UnlockWallet, serverConfig.TimePerBlock, chain, serv, log)
if err != nil {
log.Error("failed to create consensus service", zap.Error(err))
break // Whatever happens, I'll leave it all to chance.
}
if dbftSrv != nil && serv.IsInSync() {
dbftSrv.Start()
}
} }
cfg = cfgnew cfg = cfgnew
case <-grace.Done(): case <-grace.Done():

View file

@ -276,6 +276,7 @@ func (s *service) Start() {
// Shutdown implements the Service interface. // Shutdown implements the Service interface.
func (s *service) Shutdown() { func (s *service) Shutdown() {
if s.started.CAS(true, false) { if s.started.CAS(true, false) {
s.log.Info("stopping consensus service")
close(s.quit) close(s.quit)
<-s.finished <-s.finished
} }

View file

@ -196,10 +196,6 @@ func newTestServerWithCustomCfg(t *testing.T, serverConfig ServerConfig, protoco
s, err := newServerFromConstructors(serverConfig, fakechain.NewFakeChainWithCustomCfg(protocolCfg), new(fakechain.FakeStateSync), zaptest.NewLogger(t), s, err := newServerFromConstructors(serverConfig, fakechain.NewFakeChainWithCustomCfg(protocolCfg), new(fakechain.FakeStateSync), zaptest.NewLogger(t),
newFakeTransp, newTestDiscovery) newFakeTransp, newTestDiscovery)
require.NoError(t, err) require.NoError(t, err)
if serverConfig.Wallet != nil {
cons := new(fakeConsensus)
s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction)
}
t.Cleanup(s.discovery.Close) t.Cleanup(s.discovery.Close)
return s return s
} }

View file

@ -64,9 +64,6 @@ type (
// Level of the internal logger. // Level of the internal logger.
LogLevel zapcore.Level LogLevel zapcore.Level
// Wallet is a wallet configuration.
Wallet *config.Wallet
// TimePerBlock is an interval which should pass between two successive blocks. // TimePerBlock is an interval which should pass between two successive blocks.
TimePerBlock time.Duration TimePerBlock time.Duration
@ -90,11 +87,6 @@ func NewServerConfig(cfg config.Config) ServerConfig {
appConfig := cfg.ApplicationConfiguration appConfig := cfg.ApplicationConfiguration
protoConfig := cfg.ProtocolConfiguration protoConfig := cfg.ProtocolConfiguration
var wc *config.Wallet
if appConfig.UnlockWallet.Path != "" {
wc = &appConfig.UnlockWallet
}
return ServerConfig{ return ServerConfig{
UserAgent: cfg.GenerateUserAgent(), UserAgent: cfg.GenerateUserAgent(),
Address: appConfig.Address, Address: appConfig.Address,
@ -110,7 +102,6 @@ func NewServerConfig(cfg config.Config) ServerConfig {
MaxPeers: appConfig.MaxPeers, MaxPeers: appConfig.MaxPeers,
AttemptConnPeers: appConfig.AttemptConnPeers, AttemptConnPeers: appConfig.AttemptConnPeers,
MinPeers: appConfig.MinPeers, MinPeers: appConfig.MinPeers,
Wallet: wc,
TimePerBlock: time.Duration(protoConfig.SecondsPerBlock) * time.Second, TimePerBlock: time.Duration(protoConfig.SecondsPerBlock) * time.Second,
OracleCfg: appConfig.Oracle, OracleCfg: appConfig.Oracle,
P2PNotaryCfg: appConfig.P2PNotary, P2PNotaryCfg: appConfig.P2PNotary,

View file

@ -109,7 +109,9 @@ func TestServerStartAndShutdown(t *testing.T) {
require.True(t, errors.Is(err, errServerShutdown)) require.True(t, errors.Is(err, errServerShutdown))
}) })
t.Run("with consensus", func(t *testing.T) { t.Run("with consensus", func(t *testing.T) {
s := newTestServer(t, ServerConfig{Wallet: new(config.Wallet)}) s := newTestServer(t, ServerConfig{})
cons := new(fakeConsensus)
s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction)
ch := startWithChannel(s) ch := startWithChannel(s)
p := newLocalPeer(t, s) p := newLocalPeer(t, s)
@ -409,7 +411,9 @@ func TestBlock(t *testing.T) {
} }
func TestConsensus(t *testing.T) { func TestConsensus(t *testing.T) {
s := newTestServer(t, ServerConfig{Wallet: new(config.Wallet)}) s := newTestServer(t, ServerConfig{})
cons := new(fakeConsensus)
s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction)
startWithCleanup(t, s) startWithCleanup(t, s)
atomic2.StoreUint32(&s.chain.(*fakechain.FakeChain).Blockheight, 4) atomic2.StoreUint32(&s.chain.(*fakechain.FakeChain).Blockheight, 4)
@ -452,7 +456,9 @@ func TestConsensus(t *testing.T) {
} }
func TestTransaction(t *testing.T) { func TestTransaction(t *testing.T) {
s := newTestServer(t, ServerConfig{Wallet: new(config.Wallet)}) s := newTestServer(t, ServerConfig{})
cons := new(fakeConsensus)
s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction)
startWithCleanup(t, s) startWithCleanup(t, s)
t.Run("good", func(t *testing.T) { t.Run("good", func(t *testing.T) {