From 3fca3352d817cf5ebc482892400d43d4aa9f4252 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 26 Jul 2022 15:18:30 +0300 Subject: [PATCH 01/14] cli: read new config on signal and check ProtocolConfiguration ProtocolConfiguration must remain the same, any errors mean that the signal will be ignored. --- cli/server/server.go | 11 ++++- pkg/config/protocol_config.go | 75 ++++++++++++++++++++++++++++++ pkg/config/protocol_config_test.go | 74 +++++++++++++++++++++++++++++ 3 files changed, 159 insertions(+), 1 deletion(-) diff --git a/cli/server/server.go b/cli/server/server.go index c772e1927..5f80bf42b 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -528,9 +528,18 @@ Main: shutdownErr = fmt.Errorf("server error: %w", err) cancel() case sig := <-sighupCh: + log.Info("signal received", zap.Stringer("name", sig)) + cfgnew, err := getConfigFromContext(ctx) + if err != nil { + log.Warn("can't reread the config file, signal ignored", zap.Error(err)) + break // Continue working. + } + if !cfg.ProtocolConfiguration.Equals(&cfgnew.ProtocolConfiguration) { + log.Warn("ProtocolConfiguration changed, signal ignored") + break // Continue working. + } switch sig { case syscall.SIGHUP: - log.Info("SIGHUP received, restarting rpc-server") rpcServer.Shutdown() rpcServer = rpcsrv.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) serv.AddService(&rpcServer) // Replaces old one by service name. diff --git a/pkg/config/protocol_config.go b/pkg/config/protocol_config.go index ab4c60cfd..5cfc743df 100644 --- a/pkg/config/protocol_config.go +++ b/pkg/config/protocol_config.go @@ -196,3 +196,78 @@ func (p *ProtocolConfiguration) GetNumOfCNs(height uint32) int { func (p *ProtocolConfiguration) ShouldUpdateCommitteeAt(height uint32) bool { return height%uint32(p.GetCommitteeSize(height)) == 0 } + +// Equals allows to compare two ProtocolConfiguration instances, returns true if +// they're equal. +func (p *ProtocolConfiguration) Equals(o *ProtocolConfiguration) bool { + if p.GarbageCollectionPeriod != o.GarbageCollectionPeriod || + p.InitialGASSupply != o.InitialGASSupply || + p.KeepOnlyLatestState != o.KeepOnlyLatestState || + p.Magic != o.Magic || + p.MaxBlockSize != o.MaxBlockSize || + p.MaxBlockSystemFee != o.MaxBlockSystemFee || + p.MaxTraceableBlocks != o.MaxTraceableBlocks || + p.MaxTransactionsPerBlock != o.MaxTransactionsPerBlock || + p.MaxValidUntilBlockIncrement != o.MaxValidUntilBlockIncrement || + p.MemPoolSize != o.MemPoolSize || + p.P2PNotaryRequestPayloadPoolSize != o.P2PNotaryRequestPayloadPoolSize || + p.P2PSigExtensions != o.P2PSigExtensions || + p.P2PStateExchangeExtensions != o.P2PStateExchangeExtensions || + p.RemoveUntraceableBlocks != o.RemoveUntraceableBlocks || + p.ReservedAttributes != o.ReservedAttributes || + p.SaveStorageBatch != o.SaveStorageBatch || + p.SecondsPerBlock != o.SecondsPerBlock || + p.StateRootInHeader != o.StateRootInHeader || + p.StateSyncInterval != o.StateSyncInterval || + p.ValidatorsCount != o.ValidatorsCount || + p.VerifyBlocks != o.VerifyBlocks || + p.VerifyTransactions != o.VerifyTransactions || + len(p.CommitteeHistory) != len(o.CommitteeHistory) || + len(p.Hardforks) != len(o.Hardforks) || + len(p.NativeUpdateHistories) != len(o.NativeUpdateHistories) || + len(p.SeedList) != len(o.SeedList) || + len(p.StandbyCommittee) != len(o.StandbyCommittee) || + len(p.ValidatorsHistory) != len(o.ValidatorsHistory) { + return false + } + for k, v := range p.CommitteeHistory { + vo, ok := o.CommitteeHistory[k] + if !ok || v != vo { + return false + } + } + for k, v := range p.Hardforks { + vo, ok := o.Hardforks[k] + if !ok || v != vo { + return false + } + } + for k, v := range p.NativeUpdateHistories { + vo := o.NativeUpdateHistories[k] + if len(v) != len(vo) { + return false + } + for i := range v { + if v[i] != vo[i] { + return false + } + } + } + for i := range p.SeedList { + if p.SeedList[i] != o.SeedList[i] { + return false + } + } + for i := range p.StandbyCommittee { + if p.StandbyCommittee[i] != o.StandbyCommittee[i] { + return false + } + } + for k, v := range p.ValidatorsHistory { + vo, ok := o.ValidatorsHistory[k] + if !ok || v != vo { + return false + } + } + return true +} diff --git a/pkg/config/protocol_config_test.go b/pkg/config/protocol_config_test.go index f95898f45..a2e976099 100644 --- a/pkg/config/protocol_config_test.go +++ b/pkg/config/protocol_config_test.go @@ -1,6 +1,7 @@ package config import ( + "path/filepath" "testing" "github.com/stretchr/testify/require" @@ -143,3 +144,76 @@ func TestGetCommitteeAndCNs(t *testing.T) { require.Equal(t, 4, p.GetNumOfCNs(200)) require.Equal(t, 4, p.GetNumOfCNs(201)) } + +func TestProtocolConfigurationEquals(t *testing.T) { + p := &ProtocolConfiguration{} + o := &ProtocolConfiguration{} + require.True(t, p.Equals(o)) + require.True(t, o.Equals(p)) + require.True(t, p.Equals(p)) + + cfg1, err := LoadFile(filepath.Join("..", "..", "config", "protocol.mainnet.yml")) + require.NoError(t, err) + cfg2, err := LoadFile(filepath.Join("..", "..", "config", "protocol.testnet.yml")) + require.NoError(t, err) + require.False(t, cfg1.ProtocolConfiguration.Equals(&cfg2.ProtocolConfiguration)) + + cfg2, err = LoadFile(filepath.Join("..", "..", "config", "protocol.mainnet.yml")) + require.NoError(t, err) + p = &cfg1.ProtocolConfiguration + o = &cfg2.ProtocolConfiguration + require.True(t, p.Equals(o)) + + o.CommitteeHistory = map[uint32]int{111: 7} + p.CommitteeHistory = map[uint32]int{111: 7} + require.True(t, p.Equals(o)) + p.CommitteeHistory[111] = 8 + require.False(t, p.Equals(o)) + + o.CommitteeHistory = nil + p.CommitteeHistory = nil + + p.Hardforks = map[string]uint32{"Fork": 42} + o.Hardforks = map[string]uint32{"Fork": 42} + require.True(t, p.Equals(o)) + p.Hardforks = map[string]uint32{"Fork2": 42} + require.False(t, p.Equals(o)) + + p.Hardforks = nil + o.Hardforks = nil + + p.NativeUpdateHistories = map[string][]uint32{"Contract": {1, 2, 3}} + o.NativeUpdateHistories = map[string][]uint32{"Contract": {1, 2, 3}} + require.True(t, p.Equals(o)) + p.NativeUpdateHistories["Contract"] = []uint32{1, 2, 3, 4} + require.False(t, p.Equals(o)) + p.NativeUpdateHistories["Contract"] = []uint32{1, 2, 4} + require.False(t, p.Equals(o)) + + p.NativeUpdateHistories = nil + o.NativeUpdateHistories = nil + + p.SeedList = []string{"url1", "url2"} + o.SeedList = []string{"url1", "url2"} + require.True(t, p.Equals(o)) + p.SeedList = []string{"url11", "url22"} + require.False(t, p.Equals(o)) + + p.SeedList = nil + o.SeedList = nil + + p.StandbyCommittee = []string{"key1", "key2"} + o.StandbyCommittee = []string{"key1", "key2"} + require.True(t, p.Equals(o)) + p.StandbyCommittee = []string{"key2", "key1"} + require.False(t, p.Equals(o)) + + p.StandbyCommittee = nil + o.StandbyCommittee = nil + + o.ValidatorsHistory = map[uint32]int{111: 0} + p.ValidatorsHistory = map[uint32]int{111: 0} + require.True(t, p.Equals(o)) + p.ValidatorsHistory = map[uint32]int{112: 0} + require.False(t, p.Equals(o)) +} From 94099de3c3ec01133cf03873e5a75089931670c6 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 26 Jul 2022 16:16:48 +0300 Subject: [PATCH 02/14] cli: also check new ApplicationConfiguration for consistency Most of the settings can't be changed, only services can. --- cli/server/server.go | 4 ++++ pkg/config/application_config.go | 22 ++++++++++++++++++++++ pkg/config/application_config_test.go | 22 ++++++++++++++++++++++ 3 files changed, 48 insertions(+) create mode 100644 pkg/config/application_config_test.go diff --git a/cli/server/server.go b/cli/server/server.go index 5f80bf42b..e864642d4 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -538,6 +538,10 @@ Main: log.Warn("ProtocolConfiguration changed, signal ignored") break // Continue working. } + if !cfg.ApplicationConfiguration.EqualsButServices(&cfgnew.ApplicationConfiguration) { + log.Warn("ApplicationConfiguration changed in incompatible way, signal ignored") + break // Continue working. + } switch sig { case syscall.SIGHUP: rpcServer.Shutdown() diff --git a/pkg/config/application_config.go b/pkg/config/application_config.go index cbf7be3d1..ac6db77af 100644 --- a/pkg/config/application_config.go +++ b/pkg/config/application_config.go @@ -29,3 +29,25 @@ type ApplicationConfiguration struct { // ExtensiblePoolSize is the maximum amount of the extensible payloads from a single sender. ExtensiblePoolSize int `yaml:"ExtensiblePoolSize"` } + +// EqualsButServices returns true when the o is the same as a except for services +// (Oracle, P2PNotary, Pprof, Prometheus, RPC, StateRoot and UnlockWallet sections). +func (a *ApplicationConfiguration) EqualsButServices(o *ApplicationConfiguration) bool { + if a.Address != o.Address || + a.AnnouncedNodePort != o.AnnouncedNodePort || + a.AttemptConnPeers != o.AttemptConnPeers || + a.DBConfiguration != o.DBConfiguration || + a.DialTimeout != o.DialTimeout || + a.ExtensiblePoolSize != o.ExtensiblePoolSize || + a.LogPath != o.LogPath || + a.MaxPeers != o.MaxPeers || + a.MinPeers != o.MinPeers || + a.NodePort != o.NodePort || + a.PingInterval != o.PingInterval || + a.PingTimeout != o.PingTimeout || + a.ProtoTickInterval != o.ProtoTickInterval || + a.Relay != o.Relay { + return false + } + return true +} diff --git a/pkg/config/application_config_test.go b/pkg/config/application_config_test.go new file mode 100644 index 000000000..8b5428e8f --- /dev/null +++ b/pkg/config/application_config_test.go @@ -0,0 +1,22 @@ +package config + +import ( + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestApplicationConfigurationEquals(t *testing.T) { + a := &ApplicationConfiguration{} + o := &ApplicationConfiguration{} + require.True(t, a.EqualsButServices(o)) + require.True(t, o.EqualsButServices(a)) + require.True(t, a.EqualsButServices(a)) + + cfg1, err := LoadFile(filepath.Join("..", "..", "config", "protocol.mainnet.yml")) + require.NoError(t, err) + cfg2, err := LoadFile(filepath.Join("..", "..", "config", "protocol.testnet.yml")) + require.NoError(t, err) + require.False(t, cfg1.ApplicationConfiguration.EqualsButServices(&cfg2.ApplicationConfiguration)) +} From 6593b94594b6de40e232db6f87ebfc13217a503c Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 26 Jul 2022 16:19:51 +0300 Subject: [PATCH 03/14] cli: use new configuration for the RPC server Also fix addresses if needed and store this new configuration. --- cli/server/server.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cli/server/server.go b/cli/server/server.go index e864642d4..248ac67e8 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -542,15 +542,17 @@ Main: log.Warn("ApplicationConfiguration changed in incompatible way, signal ignored") break // Continue working. } + configureAddresses(&cfgnew.ApplicationConfiguration) switch sig { case syscall.SIGHUP: rpcServer.Shutdown() - rpcServer = rpcsrv.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) + rpcServer = rpcsrv.New(chain, cfgnew.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) serv.AddService(&rpcServer) // Replaces old one by service name. - if !cfg.ApplicationConfiguration.RPC.StartWhenSynchronized || serv.IsInSync() { + if !cfgnew.ApplicationConfiguration.RPC.StartWhenSynchronized || serv.IsInSync() { rpcServer.Start() } } + cfg = cfgnew case <-grace.Done(): signal.Stop(sighupCh) serv.Shutdown() From df24c1268e9ba09ecd4939afbaeb66ba582057d1 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 26 Jul 2022 17:19:30 +0300 Subject: [PATCH 04/14] cli: restart pprof and prometheus on HUP --- cli/server/server.go | 6 ++++++ pkg/services/metrics/metrics.go | 3 +++ 2 files changed, 9 insertions(+) diff --git a/cli/server/server.go b/cli/server/server.go index 248ac67e8..42b699bd8 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -551,6 +551,12 @@ Main: if !cfgnew.ApplicationConfiguration.RPC.StartWhenSynchronized || serv.IsInSync() { rpcServer.Start() } + pprof.ShutDown() + pprof = metrics.NewPprofService(cfgnew.ApplicationConfiguration.Pprof, log) + go pprof.Start() + prometheus.ShutDown() + prometheus = metrics.NewPrometheusService(cfgnew.ApplicationConfiguration.Prometheus, log) + go prometheus.Start() } cfg = cfgnew case <-grace.Done(): diff --git a/pkg/services/metrics/metrics.go b/pkg/services/metrics/metrics.go index 071c1fbeb..b3cb4c27b 100644 --- a/pkg/services/metrics/metrics.go +++ b/pkg/services/metrics/metrics.go @@ -31,6 +31,9 @@ func (ms *Service) Start() { // ShutDown stops the service. func (ms *Service) ShutDown() { + if !ms.config.Enabled { + return + } ms.log.Info("shutting down service", zap.String("endpoint", ms.Addr)) err := ms.Shutdown(context.Background()) if err != nil { From 98e2c5568c440692e8c2eedcd0c98feacef5bf28 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 26 Jul 2022 18:27:33 +0300 Subject: [PATCH 05/14] rpcsrv: don't init Oracle in New, drop oracle dependency The only thing rpcsrv needs is AddResponse callback. --- pkg/services/oracle/broadcaster/oracle.go | 10 +++---- pkg/services/oracle/oracle.go | 36 +++-------------------- pkg/services/oracle/request.go | 4 +-- pkg/services/rpcsrv/server.go | 13 ++++---- 4 files changed, 18 insertions(+), 45 deletions(-) diff --git a/pkg/services/oracle/broadcaster/oracle.go b/pkg/services/oracle/broadcaster/oracle.go index b14b09e5c..0655580b6 100644 --- a/pkg/services/oracle/broadcaster/oracle.go +++ b/pkg/services/oracle/broadcaster/oracle.go @@ -10,7 +10,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/services/helpers/rpcbroadcaster" - "github.com/nspcc-dev/neo-go/pkg/services/oracle" "go.uber.org/zap" ) @@ -20,16 +19,17 @@ const ( defaultChanCapacity = 16 ) -type oracleBroadcaster struct { +// OracleBroadcaster is an oracle broadcaster implementation. +type OracleBroadcaster struct { rpcbroadcaster.RPCBroadcaster } // New returns a new struct capable of broadcasting oracle responses. -func New(cfg config.OracleConfiguration, log *zap.Logger) oracle.Broadcaster { +func New(cfg config.OracleConfiguration, log *zap.Logger) *OracleBroadcaster { if cfg.ResponseTimeout == 0 { cfg.ResponseTimeout = defaultSendTimeout } - r := &oracleBroadcaster{ + r := &OracleBroadcaster{ RPCBroadcaster: *rpcbroadcaster.NewRPCBroadcaster(log, cfg.ResponseTimeout), } for i := range cfg.Nodes { @@ -40,7 +40,7 @@ func New(cfg config.OracleConfiguration, log *zap.Logger) oracle.Broadcaster { } // SendResponse implements interfaces.Broadcaster. -func (r *oracleBroadcaster) SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) { +func (r *OracleBroadcaster) SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) { pub := priv.PublicKey() data := GetMessage(pub.Bytes(), resp.ID, txSig) msgSig := priv.Sign(data) diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index a88077dee..96cfdb4dd 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -13,6 +13,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/broadcaster" "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util/slice" @@ -43,9 +44,6 @@ type ( oracleScript []byte verifyOffset int - // mtx protects setting callbacks. - mtx sync.RWMutex - // accMtx protects account and oracle nodes. accMtx sync.RWMutex currAccount *wallet.Account @@ -94,8 +92,6 @@ type ( Shutdown() } - defaultResponseHandler struct{} - // TxCallback executes on new transactions when they are ready to be pooled. TxCallback = func(tx *transaction.Transaction) error ) @@ -165,7 +161,7 @@ func NewOracle(cfg Config) (*Oracle, error) { } if o.ResponseHandler == nil { - o.ResponseHandler = defaultResponseHandler{} + o.ResponseHandler = broadcaster.New(cfg.MainCfg, cfg.Log) } if o.OnTransaction == nil { o.OnTransaction = func(*transaction.Transaction) error { return nil } @@ -192,7 +188,7 @@ func (o *Oracle) Shutdown() { } o.running = false close(o.close) - o.getBroadcaster().Shutdown() + o.ResponseHandler.Shutdown() <-o.done } @@ -217,6 +213,7 @@ func (o *Oracle) start() { for i := 0; i < o.MainCfg.MaxConcurrentRequests; i++ { go o.runRequestWorker() } + go o.ResponseHandler.Run() tick := time.NewTicker(o.MainCfg.RefreshInterval) main: @@ -284,28 +281,3 @@ func (o *Oracle) sendTx(tx *transaction.Transaction) { zap.Error(err)) } } - -func (o *Oracle) getBroadcaster() Broadcaster { - o.mtx.RLock() - defer o.mtx.RUnlock() - return o.ResponseHandler -} - -// SetBroadcaster sets callback to broadcast response. -func (o *Oracle) SetBroadcaster(b Broadcaster) { - o.mtx.Lock() - defer o.mtx.Unlock() - o.ResponseHandler.Shutdown() - o.ResponseHandler = b - go b.Run() -} - -// SendResponse implements Broadcaster interface. -func (defaultResponseHandler) SendResponse(*keys.PrivateKey, *transaction.OracleResponse, []byte) { -} - -// Run implements Broadcaster interface. -func (defaultResponseHandler) Run() {} - -// Shutdown implements Broadcaster interface. -func (defaultResponseHandler) Shutdown() {} diff --git a/pkg/services/oracle/request.go b/pkg/services/oracle/request.go index 9eba982ac..1b6ed6197 100644 --- a/pkg/services/oracle/request.go +++ b/pkg/services/oracle/request.go @@ -234,7 +234,7 @@ func (o *Oracle) processRequest(priv *keys.PrivateKey, req request) error { incTx.attempts++ incTx.Unlock() - o.getBroadcaster().SendResponse(priv, resp, txSig) + o.ResponseHandler.SendResponse(priv, resp, txSig) if ready { o.sendTx(readyTx) } @@ -265,7 +265,7 @@ func (o *Oracle) processFailedRequest(priv *keys.PrivateKey, req request) { txSig := incTx.backupSigs[string(priv.PublicKey().Bytes())].sig incTx.Unlock() - o.getBroadcaster().SendResponse(priv, getFailedResponse(req.ID), txSig) + o.ResponseHandler.SendResponse(priv, getFailedResponse(req.ID), txSig) if ready { o.sendTx(readyTx) } diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index b08968046..64e78656a 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -43,7 +43,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/network" "github.com/nspcc-dev/neo-go/pkg/network/payload" - "github.com/nspcc-dev/neo-go/pkg/services/oracle" "github.com/nspcc-dev/neo-go/pkg/services/oracle/broadcaster" "github.com/nspcc-dev/neo-go/pkg/services/rpcsrv/params" "github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag" @@ -108,6 +107,11 @@ type ( mempool.Feer // fee interface } + // OracleHandler is the interface oracle service needs to provide for the Server. + OracleHandler interface { + AddResponse(pub *keys.PublicKey, reqID uint64, txSig []byte) + } + // Server represents the JSON-RPC 2.0 server. Server struct { *http.Server @@ -118,7 +122,7 @@ type ( network netmode.Magic stateRootEnabled bool coreServer *network.Server - oracle *oracle.Oracle + oracle OracleHandler log *zap.Logger https *http.Server shutdown chan struct{} @@ -248,7 +252,7 @@ var upgrader = websocket.Upgrader{} // New creates a new Server struct. func New(chain Ledger, conf config.RPC, coreServer *network.Server, - orc *oracle.Oracle, log *zap.Logger, errChan chan error) Server { + orc OracleHandler, log *zap.Logger, errChan chan error) Server { httpServer := &http.Server{ Addr: conf.Address + ":" + strconv.FormatUint(uint64(conf.Port), 10), } @@ -260,9 +264,6 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server, } } - if orc != nil { - orc.SetBroadcaster(broadcaster.New(orc.MainCfg, log)) - } protoCfg := chain.GetConfig() if conf.SessionEnabled { if conf.SessionExpirationTime <= 0 { From 2adcf406d3fcd92d4f002f3b05a9f08155822e94 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 26 Jul 2022 21:36:37 +0300 Subject: [PATCH 06/14] cli: reload Oracle service on USR1 Which allows to enable/disable the service, change nodes, keys and other settings. Unfortunately, atomic.Value doesn't allow Store(nil), so we have to store a pointer there that can point to nil interface. --- cli/server/server.go | 37 ++++++++++++++++++++++-------- pkg/core/blockchain.go | 28 ++++++++++++++++------ pkg/core/native/designate.go | 4 ++-- pkg/core/native/oracle.go | 34 ++++++++++++++------------- pkg/services/oracle/oracle.go | 1 + pkg/services/oracle/oracle_test.go | 2 +- pkg/services/rpcsrv/server.go | 18 +++++++++++---- 7 files changed, 85 insertions(+), 39 deletions(-) diff --git a/cli/server/server.go b/cli/server/server.go index 42b699bd8..dc99df1e9 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -12,6 +12,7 @@ import ( "github.com/nspcc-dev/neo-go/cli/options" "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/config/netmode" "github.com/nspcc-dev/neo-go/pkg/consensus" "github.com/nspcc-dev/neo-go/pkg/core" "github.com/nspcc-dev/neo-go/pkg/core/block" @@ -385,14 +386,14 @@ func restoreDB(ctx *cli.Context) error { return nil } -func mkOracle(config network.ServerConfig, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (*oracle.Oracle, error) { - if !config.OracleCfg.Enabled { +func mkOracle(config config.OracleConfiguration, magic netmode.Magic, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (*oracle.Oracle, error) { + if !config.Enabled { return nil, nil } orcCfg := oracle.Config{ Log: log, - Network: config.Net, - MainCfg: config.OracleCfg, + Network: magic, + MainCfg: config, Chain: chain, OnTransaction: serv.RelayTxn, } @@ -492,7 +493,7 @@ func startServer(ctx *cli.Context) error { } serv.AddExtensibleService(sr, stateroot.Category, sr.OnPayload) - oracleSrv, err := mkOracle(serverConfig, chain, serv, log) + oracleSrv, err := mkOracle(cfg.ApplicationConfiguration.Oracle, cfg.ProtocolConfiguration.Magic, chain, serv, log) if err != nil { return cli.NewExitError(err, 1) } @@ -513,8 +514,9 @@ func startServer(ctx *cli.Context) error { rpcServer.Start() } - sighupCh := make(chan os.Signal, 1) - signal.Notify(sighupCh, syscall.SIGHUP) + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGHUP) + signal.Notify(sigCh, syscall.SIGUSR1) fmt.Fprintln(ctx.App.Writer, Logo()) fmt.Fprintln(ctx.App.Writer, serv.UserAgent) @@ -527,7 +529,7 @@ Main: case err := <-errChan: shutdownErr = fmt.Errorf("server error: %w", err) cancel() - case sig := <-sighupCh: + case sig := <-sigCh: log.Info("signal received", zap.Stringer("name", sig)) cfgnew, err := getConfigFromContext(ctx) if err != nil { @@ -557,10 +559,27 @@ Main: prometheus.ShutDown() prometheus = metrics.NewPrometheusService(cfgnew.ApplicationConfiguration.Prometheus, log) go prometheus.Start() + case syscall.SIGUSR1: + if oracleSrv != nil { + chain.SetOracle(nil) + rpcServer.SetOracleHandler(nil) + oracleSrv.Shutdown() + } + oracleSrv, err = mkOracle(cfgnew.ApplicationConfiguration.Oracle, cfgnew.ProtocolConfiguration.Magic, chain, serv, log) + if err != nil { + log.Error("failed to create oracle service", zap.Error(err)) + break // Keep going. + } + if oracleSrv != nil { + rpcServer.SetOracleHandler(oracleSrv) + if serv.IsInSync() { + oracleSrv.Start() + } + } } cfg = cfgnew case <-grace.Done(): - signal.Stop(sighupCh) + signal.Stop(sigCh) serv.Shutdown() break Main } diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index ca47cb7f2..0247518aa 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -303,14 +303,28 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L // must be called before `bc.Run()` to avoid data race. func (bc *Blockchain) SetOracle(mod native.OracleService) { orc := bc.contracts.Oracle - md, ok := orc.GetMethod(manifest.MethodVerify, -1) - if !ok { - panic(fmt.Errorf("%s method not found", manifest.MethodVerify)) + if mod != nil { + md, ok := orc.GetMethod(manifest.MethodVerify, -1) + if !ok { + panic(fmt.Errorf("%s method not found", manifest.MethodVerify)) + } + mod.UpdateNativeContract(orc.NEF.Script, orc.GetOracleResponseScript(), + orc.Hash, md.MD.Offset) + keys, _, err := bc.contracts.Designate.GetDesignatedByRole(bc.dao, noderoles.Oracle, bc.BlockHeight()) + if err != nil { + bc.log.Error("failed to get oracle key list") + return + } + mod.UpdateOracleNodes(keys) + reqs, err := bc.contracts.Oracle.GetRequests(bc.dao) + if err != nil { + bc.log.Error("failed to get current oracle request list") + return + } + mod.AddRequests(reqs) } - mod.UpdateNativeContract(orc.NEF.Script, orc.GetOracleResponseScript(), - orc.Hash, md.MD.Offset) - orc.Module.Store(mod) - bc.contracts.Designate.OracleService.Store(mod) + orc.Module.Store(&mod) + bc.contracts.Designate.OracleService.Store(&mod) } // SetNotary sets notary module. It doesn't protected by mutex and diff --git a/pkg/core/native/designate.go b/pkg/core/native/designate.go index 492e8849f..8427e5fc8 100644 --- a/pkg/core/native/designate.go +++ b/pkg/core/native/designate.go @@ -238,8 +238,8 @@ func (s *Designate) updateCachedRoleData(cache *DesignationCache, d *dao.Simple, func (s *Designate) notifyRoleChanged(v *roleData, r noderoles.Role) { switch r { case noderoles.Oracle: - if orc, _ := s.OracleService.Load().(OracleService); orc != nil { - orc.UpdateOracleNodes(v.nodes.Copy()) + if orc, _ := s.OracleService.Load().(*OracleService); orc != nil && *orc != nil { + (*orc).UpdateOracleNodes(v.nodes.Copy()) } case noderoles.P2PNotary: if ntr, _ := s.NotaryService.Load().(NotaryService); ntr != nil { diff --git a/pkg/core/native/oracle.go b/pkg/core/native/oracle.go index a31eb929d..62ac80b00 100644 --- a/pkg/core/native/oracle.go +++ b/pkg/core/native/oracle.go @@ -113,7 +113,10 @@ func copyOracleCache(src, dst *OracleCache) { } func newOracle() *Oracle { - o := &Oracle{ContractMD: *interop.NewContractMD(nativenames.Oracle, oracleContractID)} + o := &Oracle{ + ContractMD: *interop.NewContractMD(nativenames.Oracle, oracleContractID), + newRequests: make(map[uint64]*state.OracleRequest), + } defer o.UpdateHash() o.oracleScript = CreateOracleResponseScript(o.Hash) @@ -161,11 +164,7 @@ func (o *Oracle) GetOracleResponseScript() []byte { // OnPersist implements the Contract interface. func (o *Oracle) OnPersist(ic *interop.Context) error { - var err error - if o.newRequests == nil { - o.newRequests, err = o.getRequests(ic.DAO) - } - return err + return nil } // PostPersist represents `postPersist` method. @@ -177,7 +176,7 @@ func (o *Oracle) PostPersist(ic *interop.Context) error { single := big.NewInt(p) var removedIDs []uint64 - orc, _ := o.Module.Load().(OracleService) + orc, _ := o.Module.Load().(*OracleService) for _, tx := range ic.Block.Transactions { resp := getResponse(tx) if resp == nil { @@ -189,7 +188,7 @@ func (o *Oracle) PostPersist(ic *interop.Context) error { continue } ic.DAO.DeleteStorageItem(o.ID, reqKey) - if orc != nil { + if orc != nil && *orc != nil { removedIDs = append(removedIDs, resp.ID) } @@ -229,8 +228,8 @@ func (o *Oracle) PostPersist(ic *interop.Context) error { o.GAS.mint(ic, nodes[i].GetScriptHash(), &reward[i], false) } - if len(removedIDs) != 0 && orc != nil { - orc.RemoveRequests(removedIDs) + if len(removedIDs) != 0 { + (*orc).RemoveRequests(removedIDs) } return o.updateCache(ic.DAO) } @@ -415,7 +414,10 @@ func (o *Oracle) PutRequestInternal(id uint64, req *state.OracleRequest, d *dao. if err := putConvertibleToDAO(o.ID, d, reqKey, req); err != nil { return err } - o.newRequests[id] = req + orc, _ := o.Module.Load().(*OracleService) + if orc != nil && *orc != nil { + o.newRequests[id] = req + } // Add request ID to the id list. lst := new(IDList) @@ -493,8 +495,8 @@ func (o *Oracle) getOriginalTxID(d *dao.Simple, tx *transaction.Transaction) uti return tx.Hash() } -// getRequests returns all requests which have not been finished yet. -func (o *Oracle) getRequests(d *dao.Simple) (map[uint64]*state.OracleRequest, error) { +// GetRequests returns all requests which have not been finished yet. +func (o *Oracle) GetRequests(d *dao.Simple) (map[uint64]*state.OracleRequest, error) { var reqs = make(map[uint64]*state.OracleRequest) var err error d.Seek(o.ID, storage.SeekRange{Prefix: prefixRequest}, func(k, v []byte) bool { @@ -534,8 +536,8 @@ func (o *Oracle) getConvertibleFromDAO(d *dao.Simple, key []byte, item stackitem // updateCache updates cached Oracle values if they've been changed. func (o *Oracle) updateCache(d *dao.Simple) error { - orc, _ := o.Module.Load().(OracleService) - if orc == nil { + orc, _ := o.Module.Load().(*OracleService) + if orc == nil || *orc == nil { return nil } @@ -547,7 +549,7 @@ func (o *Oracle) updateCache(d *dao.Simple) error { delete(reqs, id) } } - orc.AddRequests(reqs) + (*orc).AddRequests(reqs) return nil } diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index 96cfdb4dd..d765db731 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -186,6 +186,7 @@ func (o *Oracle) Shutdown() { if !o.running { return } + o.Log.Info("stopping oracle service") o.running = false close(o.close) o.ResponseHandler.Shutdown() diff --git a/pkg/services/oracle/oracle_test.go b/pkg/services/oracle/oracle_test.go index f8fc6c023..f30aebbb1 100644 --- a/pkg/services/oracle/oracle_test.go +++ b/pkg/services/oracle/oracle_test.go @@ -121,8 +121,8 @@ func TestCreateResponseTx(t *testing.T) { Result: []byte{0}, } cInvoker.Invoke(t, stackitem.Null{}, "requestURL", req.URL, *req.Filter, req.CallbackMethod, req.UserData, int64(req.GasForResponse)) - orc.UpdateOracleNodes(keys.PublicKeys{acc.PrivateKey().PublicKey()}) bc.SetOracle(orc) + orc.UpdateOracleNodes(keys.PublicKeys{acc.PrivateKey().PublicKey()}) tx, err = orc.CreateResponseTx(int64(req.GasForResponse), 1, resp) require.NoError(t, err) assert.Equal(t, 166, tx.Size()) diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index 64e78656a..b7534c13b 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -122,7 +122,7 @@ type ( network netmode.Magic stateRootEnabled bool coreServer *network.Server - oracle OracleHandler + oracle *atomic.Value log *zap.Logger https *http.Server shutdown chan struct{} @@ -275,6 +275,10 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server, log.Info("SessionPoolSize is not set or wrong, setting default value", zap.Int("SessionPoolSize", defaultSessionPoolSize)) } } + var oracleWrapped = new(atomic.Value) + if orc != nil { + oracleWrapped.Store(&orc) + } return Server{ Server: httpServer, chain: chain, @@ -284,7 +288,7 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server, stateRootEnabled: protoCfg.StateRootInHeader, coreServer: coreServer, log: log, - oracle: orc, + oracle: oracleWrapped, https: tlsServer, shutdown: make(chan struct{}), started: atomic.NewBool(false), @@ -400,6 +404,11 @@ func (s *Server) Shutdown() { <-s.executionCh } +// SetOracleHandler allows to update oracle handler used by the Server. +func (s *Server) SetOracleHandler(orc OracleHandler) { + s.oracle.Store(&orc) +} + func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Request) { req := params.NewRequest() @@ -2328,7 +2337,8 @@ func getRelayResult(err error, hash util.Uint256) (interface{}, *neorpc.Error) { } func (s *Server) submitOracleResponse(ps params.Params) (interface{}, *neorpc.Error) { - if s.oracle == nil { + oracle := s.oracle.Load().(*OracleHandler) + if oracle == nil || *oracle == nil { return nil, neorpc.NewRPCError("Oracle is not enabled", "") } var pub *keys.PublicKey @@ -2355,7 +2365,7 @@ func (s *Server) submitOracleResponse(ps params.Params) (interface{}, *neorpc.Er if !pub.Verify(msgSig, hash.Sha256(data).BytesBE()) { return nil, neorpc.NewRPCError("Invalid request signature", "") } - s.oracle.AddResponse(pub, uint64(reqID), txSig) + (*oracle).AddResponse(pub, uint64(reqID), txSig) return json.RawMessage([]byte("{}")), nil } From 9341bb6628e9cde617d840cc17c9a12c18b41e7e Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 26 Jul 2022 22:03:58 +0300 Subject: [PATCH 07/14] cli: restart notary service on USR1 --- cli/server/server.go | 20 ++++++++++++++++---- pkg/core/blockchain.go | 10 +++++++++- pkg/core/native/designate.go | 4 ++-- pkg/services/notary/notary.go | 1 + 4 files changed, 28 insertions(+), 7 deletions(-) diff --git a/cli/server/server.go b/cli/server/server.go index dc99df1e9..d24f9b4eb 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -427,15 +427,15 @@ func mkConsensus(config network.ServerConfig, chain *core.Blockchain, serv *netw return srv, nil } -func mkP2PNotary(config network.ServerConfig, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (*notary.Notary, error) { - if !config.P2PNotaryCfg.Enabled { +func mkP2PNotary(config config.P2PNotary, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (*notary.Notary, error) { + if !config.Enabled { return nil, nil } if !chain.P2PSigExtensionsEnabled() { return nil, errors.New("P2PSigExtensions are disabled, but Notary service is enabled") } cfg := notary.Config{ - MainCfg: config.P2PNotaryCfg, + MainCfg: config, Chain: chain, Log: log, } @@ -501,7 +501,7 @@ func startServer(ctx *cli.Context) error { if err != nil { return cli.NewExitError(err, 1) } - _, err = mkP2PNotary(serverConfig, chain, serv, log) + p2pNotary, err := mkP2PNotary(cfg.ApplicationConfiguration.P2PNotary, chain, serv, log) if err != nil { return cli.NewExitError(err, 1) } @@ -576,6 +576,18 @@ Main: oracleSrv.Start() } } + if p2pNotary != nil { + chain.SetNotary(nil) + p2pNotary.Shutdown() + } + p2pNotary, err = mkP2PNotary(cfgnew.ApplicationConfiguration.P2PNotary, chain, serv, log) + if err != nil { + log.Error("failed to create notary service", zap.Error(err)) + break // Keep going. + } + if p2pNotary != nil && serv.IsInSync() { + p2pNotary.Start() + } } cfg = cfgnew case <-grace.Done(): diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 0247518aa..73bb05915 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -330,7 +330,15 @@ func (bc *Blockchain) SetOracle(mod native.OracleService) { // SetNotary sets notary module. It doesn't protected by mutex and // must be called before `bc.Run()` to avoid data race. func (bc *Blockchain) SetNotary(mod native.NotaryService) { - bc.contracts.Designate.NotaryService.Store(mod) + if mod != nil { + keys, _, err := bc.contracts.Designate.GetDesignatedByRole(bc.dao, noderoles.P2PNotary, bc.BlockHeight()) + if err != nil { + bc.log.Error("failed to get notary key list") + return + } + mod.UpdateNotaryNodes(keys) + } + bc.contracts.Designate.NotaryService.Store(&mod) } func (bc *Blockchain) init() error { diff --git a/pkg/core/native/designate.go b/pkg/core/native/designate.go index 8427e5fc8..caf587ccf 100644 --- a/pkg/core/native/designate.go +++ b/pkg/core/native/designate.go @@ -242,8 +242,8 @@ func (s *Designate) notifyRoleChanged(v *roleData, r noderoles.Role) { (*orc).UpdateOracleNodes(v.nodes.Copy()) } case noderoles.P2PNotary: - if ntr, _ := s.NotaryService.Load().(NotaryService); ntr != nil { - ntr.UpdateNotaryNodes(v.nodes.Copy()) + if ntr, _ := s.NotaryService.Load().(*NotaryService); ntr != nil && *ntr != nil { + (*ntr).UpdateNotaryNodes(v.nodes.Copy()) } case noderoles.StateValidator: if s.StateRootService != nil { diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index b3141a2c8..0fb1a6c90 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -219,6 +219,7 @@ func (n *Notary) Shutdown() { if !n.started.CAS(true, false) { return } + n.Config.Log.Info("stopping notary service") close(n.stopCh) <-n.done } From bf92966633cc4d1a8f5816c366460480ca909b9e Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 26 Jul 2022 22:27:36 +0300 Subject: [PATCH 08/14] cli: reload state root service on USR1 It's a bit special since it's _always_ present to catch stateroots from the network. --- cli/server/server.go | 11 +++++++++++ pkg/services/stateroot/validators.go | 1 + 2 files changed, 12 insertions(+) diff --git a/cli/server/server.go b/cli/server/server.go index d24f9b4eb..6294044e4 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -588,6 +588,17 @@ Main: if p2pNotary != nil && serv.IsInSync() { p2pNotary.Start() } + srMod.SetUpdateValidatorsCallback(nil) + sr.Shutdown() + sr, err = stateroot.New(cfgnew.ApplicationConfiguration.StateRoot, srMod, log, chain, serv.BroadcastExtensible) + if err != nil { + log.Error("failed to create state validation service", zap.Error(err)) + break // The show must go on. + } + serv.AddExtensibleService(sr, stateroot.Category, sr.OnPayload) + if serv.IsInSync() { + sr.Start() + } } cfg = cfgnew case <-grace.Done(): diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index 536440aaf..5a3ce73a4 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -71,6 +71,7 @@ func (s *service) Shutdown() { if !s.started.CAS(true, false) { return } + s.log.Info("stopping state validation service") close(s.stopCh) <-s.done } From 5a7fa2d3dfe1365da2b51e3ef127395a61ead722 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 26 Jul 2022 22:41:52 +0300 Subject: [PATCH 09/14] cli: restart consensus service on USR2 Fix #1949. Also drop wallet from the ServerConfig since it's not used in any meaningful way after this change. --- cli/executor_test.go | 2 +- cli/server/server.go | 24 +++++++++++++++++++----- pkg/consensus/consensus.go | 1 + pkg/network/helper_test.go | 4 ---- pkg/network/server_config.go | 9 --------- pkg/network/server_test.go | 12 +++++++++--- 6 files changed, 30 insertions(+), 22 deletions(-) diff --git a/cli/executor_test.go b/cli/executor_test.go index 93d2f9843..b3564baa4 100644 --- a/cli/executor_test.go +++ b/cli/executor_test.go @@ -147,7 +147,7 @@ func newTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch Chain: chain, ProtocolConfiguration: chain.GetConfig(), RequestTx: netSrv.RequestTx, - Wallet: serverConfig.Wallet, + Wallet: &cfg.ApplicationConfiguration.UnlockWallet, TimePerBlock: serverConfig.TimePerBlock, }) require.NoError(t, err) diff --git a/cli/server/server.go b/cli/server/server.go index 6294044e4..4c60e3ce4 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -9,6 +9,7 @@ import ( "os/signal" "runtime" "syscall" + "time" "github.com/nspcc-dev/neo-go/cli/options" "github.com/nspcc-dev/neo-go/pkg/config" @@ -406,8 +407,8 @@ func mkOracle(config config.OracleConfiguration, magic netmode.Magic, chain *cor return orc, nil } -func mkConsensus(config network.ServerConfig, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (consensus.Service, error) { - if config.Wallet == nil { +func mkConsensus(config config.Wallet, tpb time.Duration, chain *core.Blockchain, serv *network.Server, log *zap.Logger) (consensus.Service, error) { + if len(config.Path) == 0 { return nil, nil } srv, err := consensus.NewService(consensus.Config{ @@ -416,8 +417,8 @@ func mkConsensus(config network.ServerConfig, chain *core.Blockchain, serv *netw Chain: chain, ProtocolConfiguration: chain.GetConfig(), RequestTx: serv.RequestTx, - Wallet: config.Wallet, - TimePerBlock: config.TimePerBlock, + Wallet: &config, + TimePerBlock: tpb, }) if err != nil { return nil, fmt.Errorf("can't initialize Consensus module: %w", err) @@ -497,7 +498,7 @@ func startServer(ctx *cli.Context) error { if err != nil { return cli.NewExitError(err, 1) } - _, err = mkConsensus(serverConfig, chain, serv, log) + dbftSrv, err := mkConsensus(cfg.ApplicationConfiguration.UnlockWallet, serverConfig.TimePerBlock, chain, serv, log) if err != nil { return cli.NewExitError(err, 1) } @@ -517,6 +518,7 @@ func startServer(ctx *cli.Context) error { sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGHUP) signal.Notify(sigCh, syscall.SIGUSR1) + signal.Notify(sigCh, syscall.SIGUSR2) fmt.Fprintln(ctx.App.Writer, Logo()) fmt.Fprintln(ctx.App.Writer, serv.UserAgent) @@ -599,6 +601,18 @@ Main: if serv.IsInSync() { sr.Start() } + case syscall.SIGUSR2: + if dbftSrv != nil { + dbftSrv.Shutdown() + } + dbftSrv, err = mkConsensus(cfgnew.ApplicationConfiguration.UnlockWallet, serverConfig.TimePerBlock, chain, serv, log) + if err != nil { + log.Error("failed to create consensus service", zap.Error(err)) + break // Whatever happens, I'll leave it all to chance. + } + if dbftSrv != nil && serv.IsInSync() { + dbftSrv.Start() + } } cfg = cfgnew case <-grace.Done(): diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index c9bce1663..977f2cd90 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -276,6 +276,7 @@ func (s *service) Start() { // Shutdown implements the Service interface. func (s *service) Shutdown() { if s.started.CAS(true, false) { + s.log.Info("stopping consensus service") close(s.quit) <-s.finished } diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index cc2e8ee53..c8847ed00 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -196,10 +196,6 @@ func newTestServerWithCustomCfg(t *testing.T, serverConfig ServerConfig, protoco s, err := newServerFromConstructors(serverConfig, fakechain.NewFakeChainWithCustomCfg(protocolCfg), new(fakechain.FakeStateSync), zaptest.NewLogger(t), newFakeTransp, newTestDiscovery) require.NoError(t, err) - if serverConfig.Wallet != nil { - cons := new(fakeConsensus) - s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) - } t.Cleanup(s.discovery.Close) return s } diff --git a/pkg/network/server_config.go b/pkg/network/server_config.go index 4bfdfb4b8..e46f52900 100644 --- a/pkg/network/server_config.go +++ b/pkg/network/server_config.go @@ -64,9 +64,6 @@ type ( // Level of the internal logger. LogLevel zapcore.Level - // Wallet is a wallet configuration. - Wallet *config.Wallet - // TimePerBlock is an interval which should pass between two successive blocks. TimePerBlock time.Duration @@ -90,11 +87,6 @@ func NewServerConfig(cfg config.Config) ServerConfig { appConfig := cfg.ApplicationConfiguration protoConfig := cfg.ProtocolConfiguration - var wc *config.Wallet - if appConfig.UnlockWallet.Path != "" { - wc = &appConfig.UnlockWallet - } - return ServerConfig{ UserAgent: cfg.GenerateUserAgent(), Address: appConfig.Address, @@ -110,7 +102,6 @@ func NewServerConfig(cfg config.Config) ServerConfig { MaxPeers: appConfig.MaxPeers, AttemptConnPeers: appConfig.AttemptConnPeers, MinPeers: appConfig.MinPeers, - Wallet: wc, TimePerBlock: time.Duration(protoConfig.SecondsPerBlock) * time.Second, OracleCfg: appConfig.Oracle, P2PNotaryCfg: appConfig.P2PNotary, diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 514851ab9..c80f4eb4b 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -109,7 +109,9 @@ func TestServerStartAndShutdown(t *testing.T) { require.True(t, errors.Is(err, errServerShutdown)) }) t.Run("with consensus", func(t *testing.T) { - s := newTestServer(t, ServerConfig{Wallet: new(config.Wallet)}) + s := newTestServer(t, ServerConfig{}) + cons := new(fakeConsensus) + s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) ch := startWithChannel(s) p := newLocalPeer(t, s) @@ -409,7 +411,9 @@ func TestBlock(t *testing.T) { } func TestConsensus(t *testing.T) { - s := newTestServer(t, ServerConfig{Wallet: new(config.Wallet)}) + s := newTestServer(t, ServerConfig{}) + cons := new(fakeConsensus) + s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) startWithCleanup(t, s) atomic2.StoreUint32(&s.chain.(*fakechain.FakeChain).Blockheight, 4) @@ -452,7 +456,9 @@ func TestConsensus(t *testing.T) { } func TestTransaction(t *testing.T) { - s := newTestServer(t, ServerConfig{Wallet: new(config.Wallet)}) + s := newTestServer(t, ServerConfig{}) + cons := new(fakeConsensus) + s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) startWithCleanup(t, s) t.Run("good", func(t *testing.T) { From 94a8784dcbb123664e45cbdf6a0b4bdd67c745e3 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 27 Jul 2022 11:25:58 +0300 Subject: [PATCH 10/14] network: allow to drop services and solve concurrency issues Now that services can come and go we need to protect all of the associated fields and allow to deregister them. --- cli/server/server.go | 7 +++- pkg/network/server.go | 84 ++++++++++++++++++++++++++++++++++++++----- 2 files changed, 81 insertions(+), 10 deletions(-) diff --git a/cli/server/server.go b/cli/server/server.go index 4c60e3ce4..505338b49 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -549,9 +549,10 @@ Main: configureAddresses(&cfgnew.ApplicationConfiguration) switch sig { case syscall.SIGHUP: + serv.DelService(&rpcServer) rpcServer.Shutdown() rpcServer = rpcsrv.New(chain, cfgnew.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) - serv.AddService(&rpcServer) // Replaces old one by service name. + serv.AddService(&rpcServer) if !cfgnew.ApplicationConfiguration.RPC.StartWhenSynchronized || serv.IsInSync() { rpcServer.Start() } @@ -563,6 +564,7 @@ Main: go prometheus.Start() case syscall.SIGUSR1: if oracleSrv != nil { + serv.DelService(oracleSrv) chain.SetOracle(nil) rpcServer.SetOracleHandler(nil) oracleSrv.Shutdown() @@ -579,6 +581,7 @@ Main: } } if p2pNotary != nil { + serv.DelService(p2pNotary) chain.SetNotary(nil) p2pNotary.Shutdown() } @@ -590,6 +593,7 @@ Main: if p2pNotary != nil && serv.IsInSync() { p2pNotary.Start() } + serv.DelExtensibleService(sr, stateroot.Category) srMod.SetUpdateValidatorsCallback(nil) sr.Shutdown() sr, err = stateroot.New(cfgnew.ApplicationConfiguration.StateRoot, srMod, log, chain, serv.BroadcastExtensible) @@ -603,6 +607,7 @@ Main: } case syscall.SIGUSR2: if dbftSrv != nil { + serv.DelExtensibleHPService(dbftSrv, consensus.Category) dbftSrv.Shutdown() } dbftSrv, err = mkConsensus(cfgnew.ApplicationConfiguration.UnlockWallet, serverConfig.TimePerBlock, chain, serv, log) diff --git a/pkg/network/server.go b/pkg/network/server.go index 929271e99..b69be9ebc 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -101,10 +101,12 @@ type ( notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer - services map[string]Service - extensHandlers map[string]func(*payload.Extensible) error - extensHighPrio string - txCallback func(*transaction.Transaction) + + serviceLock sync.RWMutex + services map[string]Service + extensHandlers map[string]func(*payload.Extensible) error + extensHighPrio string + txCallback func(*transaction.Transaction) txInLock sync.Mutex txInMap map[util.Uint256]struct{} @@ -263,9 +265,11 @@ func (s *Server) Shutdown() { } s.bQueue.discard() s.bSyncQueue.discard() + s.serviceLock.RLock() for _, svc := range s.services { svc.Shutdown() } + s.serviceLock.RUnlock() if s.chain.P2PSigExtensionsEnabled() { s.notaryRequestPool.StopSubscriptions() } @@ -274,20 +278,72 @@ func (s *Server) Shutdown() { // AddService allows to add a service to be started/stopped by Server. func (s *Server) AddService(svc Service) { + s.serviceLock.Lock() + defer s.serviceLock.Unlock() + s.addService(svc) +} + +// addService is an unlocked version of AddService. +func (s *Server) addService(svc Service) { s.services[svc.Name()] = svc } // AddExtensibleService register a service that handles an extensible payload of some kind. func (s *Server) AddExtensibleService(svc Service, category string, handler func(*payload.Extensible) error) { + s.serviceLock.Lock() + defer s.serviceLock.Unlock() + s.addExtensibleService(svc, category, handler) +} + +// addExtensibleService is an unlocked version of AddExtensibleService. +func (s *Server) addExtensibleService(svc Service, category string, handler func(*payload.Extensible) error) { s.extensHandlers[category] = handler - s.AddService(svc) + s.addService(svc) } // AddExtensibleHPService registers a high-priority service that handles an extensible payload of some kind. func (s *Server) AddExtensibleHPService(svc Service, category string, handler func(*payload.Extensible) error, txCallback func(*transaction.Transaction)) { + s.serviceLock.Lock() + defer s.serviceLock.Unlock() s.txCallback = txCallback s.extensHighPrio = category - s.AddExtensibleService(svc, category, handler) + s.addExtensibleService(svc, category, handler) +} + +// DelService drops a service from the list, use it when the service is stopped +// outside of the Server. +func (s *Server) DelService(svc Service) { + s.serviceLock.Lock() + defer s.serviceLock.Unlock() + s.delService(svc) +} + +// delService is an unlocked version of DelService. +func (s *Server) delService(svc Service) { + delete(s.services, svc.Name()) +} + +// DelExtensibleService drops a service that handler extensible payloads from the +// list, use it when the service is stopped outside of the Server. +func (s *Server) DelExtensibleService(svc Service, category string) { + s.serviceLock.Lock() + defer s.serviceLock.Unlock() + s.delExtensibleService(svc, category) +} + +// delExtensibleService is an unlocked version of DelExtensibleService. +func (s *Server) delExtensibleService(svc Service, category string) { + delete(s.extensHandlers, category) + s.delService(svc) +} + +// DelExtensibleHPService unregisters a high-priority service that handles an extensible payload of some kind. +func (s *Server) DelExtensibleHPService(svc Service, category string) { + s.serviceLock.Lock() + defer s.serviceLock.Unlock() + s.txCallback = nil + s.extensHighPrio = "" + s.delExtensibleService(svc, category) } // GetNotaryPool allows to retrieve notary pool, if it's configured. @@ -428,9 +484,11 @@ func (s *Server) tryStartServices() { if s.chain.P2PSigExtensionsEnabled() { s.notaryRequestPool.RunSubscriptions() // WSClient is also a subscriber. } + s.serviceLock.RLock() for _, svc := range s.services { svc.Start() } + s.serviceLock.RUnlock() } } @@ -931,7 +989,9 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error { if !ok { // payload is already in cache return nil } + s.serviceLock.RLock() handler := s.extensHandlers[e.Category] + s.serviceLock.RUnlock() if handler != nil { err = handler(e) if err != nil { @@ -944,7 +1004,10 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error { func (s *Server) advertiseExtensible(e *payload.Extensible) { msg := NewMessage(CMDInv, payload.NewInventory(payload.ExtensibleType, []util.Uint256{e.Hash()})) - if e.Category == s.extensHighPrio { + s.serviceLock.RLock() + hp := s.extensHighPrio + s.serviceLock.RUnlock() + if e.Category == hp { // It's high priority because it directly affects consensus process, // even though it's just an inv. s.broadcastHPMessage(msg) @@ -966,8 +1029,11 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error { } s.txInMap[tx.Hash()] = struct{}{} s.txInLock.Unlock() - if s.txCallback != nil { - s.txCallback(tx) + s.serviceLock.RLock() + txCallback := s.txCallback + s.serviceLock.RUnlock() + if txCallback != nil { + txCallback(tx) } if s.verifyAndPoolTX(tx) == nil { s.broadcastTX(tx, nil) From e1ef3c45ce145a934fc749391bb8f66f64d9cd56 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 27 Jul 2022 12:23:49 +0300 Subject: [PATCH 11/14] docs: update documentation for signals --- docs/cli.md | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/docs/cli.md b/docs/cli.md index dbbbdfd91..f2f8ab15d 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -68,12 +68,26 @@ current height of the node. ### Restarting node services -To restart some node services without full node restart, send the SIGHUP -signal. List of the services to be restarted on SIGHUP receiving: +On Unix-like platforms HUP, USR1 and USR2 signals can be used to control node +services. Upon receiving any of these signals node rereads the configuration +file, checks for its compatibility (ProtocolConfiguration can't be changed and +ApplicationConfiguration can only be changed for services) and then +stops/starts services according to the old and new configurations. Services +are broadly split into three main categories: + * client-oriented + These provide some service to clients: RPC, Pprof and Prometheus + servers. They're controlled with the HUP signal. + * network-oriented + These provide some service to the network: Oracle, State validation and P2P + Notary. They're controlled with the USR1 signal. + * consensus + That's dBFT, it's a special one and it's controlled with USR2. -| Service | Action | -| --- | --- | -| RPC server | Restarting with the old configuration and updated TLS certificates | +Typical scenarios when this can be useful (without full node restart): + * enabling some service + * changing RPC configuration + * updating TLS certificates for the RPC server + * resolving operational issues ### DB import/exports From b92896b0c0e0bf94fc0e916048b74beb33274a79 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 27 Jul 2022 12:51:41 +0300 Subject: [PATCH 12/14] cli: fix FTBFS on Windows It has a stub for SIGHUP, but doesn't have anything for USR1 and USR2: Error: cli\server\server.go:520:31: undefined: syscall.SIGUSR1 Error: cli\server\server.go:521:31: undefined: syscall.SIGUSR2 Error: cli\server\server.go:565:17: undefined: syscall.SIGUSR1 Error: cli\server\server.go:608:17: undefined: syscall.SIGUSR2 --- cli/server/server.go | 13 ++++++------- cli/server/signals_unix.go | 12 ++++++++++++ cli/server/signals_windows.go | 13 +++++++++++++ 3 files changed, 31 insertions(+), 7 deletions(-) create mode 100644 cli/server/signals_unix.go create mode 100644 cli/server/signals_windows.go diff --git a/cli/server/server.go b/cli/server/server.go index 505338b49..f40d1405d 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -8,7 +8,6 @@ import ( "os" "os/signal" "runtime" - "syscall" "time" "github.com/nspcc-dev/neo-go/cli/options" @@ -516,9 +515,9 @@ func startServer(ctx *cli.Context) error { } sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, syscall.SIGHUP) - signal.Notify(sigCh, syscall.SIGUSR1) - signal.Notify(sigCh, syscall.SIGUSR2) + signal.Notify(sigCh, sighup) + signal.Notify(sigCh, sigusr1) + signal.Notify(sigCh, sigusr2) fmt.Fprintln(ctx.App.Writer, Logo()) fmt.Fprintln(ctx.App.Writer, serv.UserAgent) @@ -548,7 +547,7 @@ Main: } configureAddresses(&cfgnew.ApplicationConfiguration) switch sig { - case syscall.SIGHUP: + case sighup: serv.DelService(&rpcServer) rpcServer.Shutdown() rpcServer = rpcsrv.New(chain, cfgnew.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) @@ -562,7 +561,7 @@ Main: prometheus.ShutDown() prometheus = metrics.NewPrometheusService(cfgnew.ApplicationConfiguration.Prometheus, log) go prometheus.Start() - case syscall.SIGUSR1: + case sigusr1: if oracleSrv != nil { serv.DelService(oracleSrv) chain.SetOracle(nil) @@ -605,7 +604,7 @@ Main: if serv.IsInSync() { sr.Start() } - case syscall.SIGUSR2: + case sigusr2: if dbftSrv != nil { serv.DelExtensibleHPService(dbftSrv, consensus.Category) dbftSrv.Shutdown() diff --git a/cli/server/signals_unix.go b/cli/server/signals_unix.go new file mode 100644 index 000000000..1f63dfaad --- /dev/null +++ b/cli/server/signals_unix.go @@ -0,0 +1,12 @@ +//go:build !windows +// +build !windows + +package server + +import "syscall" + +const ( + sighup = syscall.SIGHUP + sigusr1 = syscall.SIGUSR1 + sigusr2 = syscall.SIGUSR2 +) diff --git a/cli/server/signals_windows.go b/cli/server/signals_windows.go new file mode 100644 index 000000000..4465c1e21 --- /dev/null +++ b/cli/server/signals_windows.go @@ -0,0 +1,13 @@ +//go:build windows +// +build windows + +package server + +import "syscall" + +const ( + // Doesn't really matter, Windows can't do it. + sighup = syscall.SIGHUP + sigusr1 = syscall.Signal(0xa) + sigusr2 = syscall.Signal(0xc) +) From ff93a680eb186674ef72914d962891518fd6f303 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 28 Jul 2022 17:49:30 +0300 Subject: [PATCH 13/14] metrics: don't Panic on bad shutdown Just log the error. --- pkg/services/metrics/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/services/metrics/metrics.go b/pkg/services/metrics/metrics.go index b3cb4c27b..e204cbe32 100644 --- a/pkg/services/metrics/metrics.go +++ b/pkg/services/metrics/metrics.go @@ -37,6 +37,6 @@ func (ms *Service) ShutDown() { ms.log.Info("shutting down service", zap.String("endpoint", ms.Addr)) err := ms.Shutdown(context.Background()) if err != nil { - ms.log.Panic("can't shut down service") + ms.log.Error("can't shut service down", zap.Error(err)) } } From 9b0ea2c21bc47e65cd1f6cc17d6679fe161279ef Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 28 Jul 2022 18:30:14 +0300 Subject: [PATCH 14/14] network/consensus: always process dBFT messages as high priority Move category definition from consensus to payload, consensus service is the one of its kind (HP), so network.Server can be adjusted accordingly. --- cli/executor_test.go | 2 +- cli/server/server.go | 4 ++-- pkg/consensus/consensus.go | 5 +---- pkg/consensus/recovery_message.go | 2 +- pkg/network/helper_test.go | 1 - pkg/network/payload/extensible.go | 4 ++++ pkg/network/server.go | 20 +++++++------------- pkg/network/server_test.go | 8 ++++---- 8 files changed, 20 insertions(+), 26 deletions(-) diff --git a/cli/executor_test.go b/cli/executor_test.go index b3564baa4..e3b8e81b9 100644 --- a/cli/executor_test.go +++ b/cli/executor_test.go @@ -151,7 +151,7 @@ func newTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch TimePerBlock: serverConfig.TimePerBlock, }) require.NoError(t, err) - netSrv.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) + netSrv.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) go netSrv.Start(make(chan error, 1)) errCh := make(chan error, 2) rpcServer := rpcsrv.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger, errCh) diff --git a/cli/server/server.go b/cli/server/server.go index f40d1405d..0d5f4e02e 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -423,7 +423,7 @@ func mkConsensus(config config.Wallet, tpb time.Duration, chain *core.Blockchain return nil, fmt.Errorf("can't initialize Consensus module: %w", err) } - serv.AddExtensibleHPService(srv, consensus.Category, srv.OnPayload, srv.OnTransaction) + serv.AddConsensusService(srv, srv.OnPayload, srv.OnTransaction) return srv, nil } @@ -606,7 +606,7 @@ Main: } case sigusr2: if dbftSrv != nil { - serv.DelExtensibleHPService(dbftSrv, consensus.Category) + serv.DelConsensusService(dbftSrv) dbftSrv.Shutdown() } dbftSrv, err = mkConsensus(cfgnew.ApplicationConfiguration.UnlockWallet, serverConfig.TimePerBlock, chain, serv, log) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 977f2cd90..dca2b80e6 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -40,9 +40,6 @@ const defaultTimePerBlock = 15 * time.Second // Number of nanoseconds in millisecond. const nsInMs = 1000000 -// Category is a message category for extensible payloads. -const Category = "dBFT" - // Ledger is the interface to Blockchain sufficient for Service. type Ledger interface { AddBlock(block *coreb.Block) error @@ -218,7 +215,7 @@ var ( func NewPayload(m netmode.Magic, stateRootEnabled bool) *Payload { return &Payload{ Extensible: npayload.Extensible{ - Category: Category, + Category: npayload.ConsensusCategory, }, message: message{ stateRootEnabled: stateRootEnabled, diff --git a/pkg/consensus/recovery_message.go b/pkg/consensus/recovery_message.go index 6ab826d7e..a7fa9c435 100644 --- a/pkg/consensus/recovery_message.go +++ b/pkg/consensus/recovery_message.go @@ -297,7 +297,7 @@ func getVerificationScript(i uint8, validators []crypto.PublicKey) []byte { func fromPayload(t messageType, recovery *Payload, p io.Serializable) *Payload { return &Payload{ Extensible: npayload.Extensible{ - Category: Category, + Category: npayload.ConsensusCategory, ValidBlockEnd: recovery.BlockIndex, }, message: message{ diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index c8847ed00..04b54959a 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -10,7 +10,6 @@ import ( "github.com/nspcc-dev/neo-go/internal/fakechain" "github.com/nspcc-dev/neo-go/pkg/config" - "github.com/nspcc-dev/neo-go/pkg/consensus" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/payload" diff --git a/pkg/network/payload/extensible.go b/pkg/network/payload/extensible.go index 1ca3c3c6d..31a909ee9 100644 --- a/pkg/network/payload/extensible.go +++ b/pkg/network/payload/extensible.go @@ -11,6 +11,10 @@ import ( const maxExtensibleCategorySize = 32 +// ConsensusCategory is a message category for consensus-related extensible +// payloads. +const ConsensusCategory = "dBFT" + // Extensible represents a payload containing arbitrary data. type Extensible struct { // Category is the payload type. diff --git a/pkg/network/server.go b/pkg/network/server.go index b69be9ebc..5eceba49b 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -105,7 +105,6 @@ type ( serviceLock sync.RWMutex services map[string]Service extensHandlers map[string]func(*payload.Extensible) error - extensHighPrio string txCallback func(*transaction.Transaction) txInLock sync.Mutex @@ -301,13 +300,12 @@ func (s *Server) addExtensibleService(svc Service, category string, handler func s.addService(svc) } -// AddExtensibleHPService registers a high-priority service that handles an extensible payload of some kind. -func (s *Server) AddExtensibleHPService(svc Service, category string, handler func(*payload.Extensible) error, txCallback func(*transaction.Transaction)) { +// AddConsensusService registers consensus service that handles transactions and dBFT extensible payloads. +func (s *Server) AddConsensusService(svc Service, handler func(*payload.Extensible) error, txCallback func(*transaction.Transaction)) { s.serviceLock.Lock() defer s.serviceLock.Unlock() s.txCallback = txCallback - s.extensHighPrio = category - s.addExtensibleService(svc, category, handler) + s.addExtensibleService(svc, payload.ConsensusCategory, handler) } // DelService drops a service from the list, use it when the service is stopped @@ -337,13 +335,12 @@ func (s *Server) delExtensibleService(svc Service, category string) { s.delService(svc) } -// DelExtensibleHPService unregisters a high-priority service that handles an extensible payload of some kind. -func (s *Server) DelExtensibleHPService(svc Service, category string) { +// DelConsensusService unregisters consensus service that handles transactions and dBFT extensible payloads. +func (s *Server) DelConsensusService(svc Service) { s.serviceLock.Lock() defer s.serviceLock.Unlock() s.txCallback = nil - s.extensHighPrio = "" - s.delExtensibleService(svc, category) + s.delExtensibleService(svc, payload.ConsensusCategory) } // GetNotaryPool allows to retrieve notary pool, if it's configured. @@ -1004,10 +1001,7 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error { func (s *Server) advertiseExtensible(e *payload.Extensible) { msg := NewMessage(CMDInv, payload.NewInventory(payload.ExtensibleType, []util.Uint256{e.Hash()})) - s.serviceLock.RLock() - hp := s.extensHighPrio - s.serviceLock.RUnlock() - if e.Category == hp { + if e.Category == payload.ConsensusCategory { // It's high priority because it directly affects consensus process, // even though it's just an inv. s.broadcastHPMessage(msg) diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index c80f4eb4b..586ca48cd 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -111,7 +111,7 @@ func TestServerStartAndShutdown(t *testing.T) { t.Run("with consensus", func(t *testing.T) { s := newTestServer(t, ServerConfig{}) cons := new(fakeConsensus) - s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) + s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) ch := startWithChannel(s) p := newLocalPeer(t, s) @@ -413,7 +413,7 @@ func TestBlock(t *testing.T) { func TestConsensus(t *testing.T) { s := newTestServer(t, ServerConfig{}) cons := new(fakeConsensus) - s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) + s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) startWithCleanup(t, s) atomic2.StoreUint32(&s.chain.(*fakechain.FakeChain).Blockheight, 4) @@ -424,7 +424,7 @@ func TestConsensus(t *testing.T) { newConsensusMessage := func(start, end uint32) *Message { pl := payload.NewExtensible() - pl.Category = consensus.Category + pl.Category = payload.ConsensusCategory pl.ValidBlockStart = start pl.ValidBlockEnd = end return NewMessage(CMDExtensible, pl) @@ -458,7 +458,7 @@ func TestConsensus(t *testing.T) { func TestTransaction(t *testing.T) { s := newTestServer(t, ServerConfig{}) cons := new(fakeConsensus) - s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) + s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) startWithCleanup(t, s) t.Run("good", func(t *testing.T) {