diff --git a/cli/executor_test.go b/cli/executor_test.go index f4014844a..978568b09 100644 --- a/cli/executor_test.go +++ b/cli/executor_test.go @@ -151,9 +151,9 @@ func newTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch require.NoError(t, err) netSrv.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) go netSrv.Start(make(chan error, 1)) - rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger) errCh := make(chan error, 2) - rpcServer.Start(errCh) + rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger, errCh) + rpcServer.Start() return chain, &rpcServer, netSrv } @@ -187,7 +187,7 @@ func newExecutorWithConfig(t *testing.T, needChain, runChain bool, f func(*confi func (e *executor) Close(t *testing.T) { input.Terminal = nil if e.RPC != nil { - require.NoError(t, e.RPC.Shutdown()) + e.RPC.Shutdown() } if e.NetSrv != nil { e.NetSrv.Shutdown() diff --git a/cli/server/server.go b/cli/server/server.go index 334e0fec7..1526b52f3 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -504,11 +504,14 @@ func startServer(ctx *cli.Context) error { if err != nil { return cli.NewExitError(err, 1) } - rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log) errChan := make(chan error) + rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) + serv.AddService(&rpcServer) go serv.Start(errChan) - rpcServer.Start(errChan) + if !cfg.ApplicationConfiguration.RPC.StartWhenSynchronized { + rpcServer.Start() + } sighupCh := make(chan os.Signal, 1) signal.Notify(sighupCh, syscall.SIGHUP) @@ -528,20 +531,16 @@ Main: switch sig { case syscall.SIGHUP: log.Info("SIGHUP received, restarting rpc-server") - serverErr := rpcServer.Shutdown() - if serverErr != nil { - errChan <- fmt.Errorf("error while restarting rpc-server: %w", serverErr) - break + rpcServer.Shutdown() + rpcServer = server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) + serv.AddService(&rpcServer) // Replaces old one by service name. + if !cfg.ApplicationConfiguration.RPC.StartWhenSynchronized || serv.IsInSync() { + rpcServer.Start() } - rpcServer = server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log) - rpcServer.Start(errChan) } case <-grace.Done(): signal.Stop(sighupCh) serv.Shutdown() - if serverErr := rpcServer.Shutdown(); serverErr != nil { - shutdownErr = fmt.Errorf("error on shutdown: %w", serverErr) - } break Main } } diff --git a/docs/cli.md b/docs/cli.md index dce0bd573..b63b586d1 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -55,6 +55,17 @@ Or specify a different network with appropriate flag like this: By default, the node will run in foreground using current standard output for logging. + +### Node synchronization + +Most of the services (state validation, oracle, consensus and RPC if +configured with `StartWhenSynchronized` option) are only started after the +node is completely synchronizaed because running them before that is either +pointless or even dangerous. The node considers itself to be fully +synchronized with the network if it has more than `MinPeers` neighbours and if +at least 2/3 of them are known to have a height less than or equal to the +current height of the node. + ### Restarting node services To restart some node services without full node restart, send the SIGHUP diff --git a/docs/node-configuration.md b/docs/node-configuration.md index 9591981ba..d8f2dc506 100644 --- a/docs/node-configuration.md +++ b/docs/node-configuration.md @@ -136,6 +136,7 @@ RPC: MaxFindResultItems: 100 MaxNEP11Tokens: 100 Port: 10332 + StartWhenSynchronized: false TLSConfig: Address: "" CertFile: serv.crt @@ -158,6 +159,10 @@ where: - `MaxNEP11Tokens` - limit for the number of tokens returned from `getnep11balances` call. - `Port` is an RPC server port it should be bound to. +- `StartWhenSynchronized` controls when RPC server will be started, by default + (`false` setting) it's started immediately and RPC is availabe during node + synchronization. Setting it to `true` will make the node start RPC service only + after full synchronization. - `TLS` section configures TLS protocol. ### State Root Configuration diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 585e34aa8..28c9b4336 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -63,6 +63,8 @@ type Ledger interface { // Service represents consensus instance. type Service interface { + // Name returns service name. + Name() string // Start initializes dBFT and starts event loop for consensus service. // It must be called only when sufficient amount of peers are connected. Start() @@ -256,6 +258,11 @@ func (s *service) newPrepareRequest() payload.PrepareRequest { return r } +// Name returns service name. +func (s *service) Name() string { + return "consensus" +} + func (s *service) Start() { if s.started.CAS(false, true) { s.log.Info("starting consensus service") diff --git a/pkg/network/blockqueue.go b/pkg/network/blockqueue.go index f3437d912..992ea54c8 100644 --- a/pkg/network/blockqueue.go +++ b/pkg/network/blockqueue.go @@ -105,10 +105,13 @@ func (bq *blockQueue) run() { func (bq *blockQueue) putBlock(block *block.Block) error { h := bq.chain.BlockHeight() bq.queueLock.Lock() + defer bq.queueLock.Unlock() + if bq.discarded.Load() { + return nil + } if block.Index <= h || h+blockCacheSize < block.Index { // can easily happen when fetching the same blocks from // different peers, thus not considered as error - bq.queueLock.Unlock() return nil } pos := indexToPosition(block.Index) @@ -122,7 +125,6 @@ func (bq *blockQueue) putBlock(block *block.Block) error { } } l := bq.len - bq.queueLock.Unlock() // update metrics updateBlockQueueLenMetric(l) select { @@ -142,8 +144,8 @@ func (bq *blockQueue) lastQueued() uint32 { func (bq *blockQueue) discard() { if bq.discarded.CAS(false, true) { - close(bq.checkBlocks) bq.queueLock.Lock() + close(bq.checkBlocks) // Technically we could bq.queue = nil, but this would cost // another if in run(). for i := 0; i < len(bq.queue); i++ { diff --git a/pkg/network/server.go b/pkg/network/server.go index 69bce8051..b7c6bb342 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -75,6 +75,7 @@ type ( // Service is a service abstraction (oracle, state root, consensus, etc). Service interface { + Name() string Start() Shutdown() } @@ -100,7 +101,7 @@ type ( notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer - services []Service + services map[string]Service extensHandlers map[string]func(*payload.Extensible) error extensHighPrio string txCallback func(*transaction.Transaction) @@ -177,6 +178,7 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), log: log, transactions: make(chan *transaction.Transaction, 64), + services: make(map[string]Service), extensHandlers: make(map[string]func(*payload.Extensible) error), stateSync: stSync, } @@ -270,7 +272,7 @@ func (s *Server) Shutdown() { // AddService allows to add a service to be started/stopped by Server. func (s *Server) AddService(svc Service) { - s.services = append(s.services, svc) + s.services[svc.Name()] = svc } // AddExtensibleService register a service that handles extensible payload of some kind. diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 0624b157b..6c1af736f 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -38,8 +38,9 @@ type fakeConsensus struct { var _ consensus.Service = (*fakeConsensus)(nil) -func (f *fakeConsensus) Start() { f.started.Store(true) } -func (f *fakeConsensus) Shutdown() { f.stopped.Store(true) } +func (f *fakeConsensus) Name() string { return "fake" } +func (f *fakeConsensus) Start() { f.started.Store(true) } +func (f *fakeConsensus) Shutdown() { f.stopped.Store(true) } func (f *fakeConsensus) OnPayload(p *payload.Extensible) error { f.payloads = append(f.payloads, p) return nil @@ -114,12 +115,12 @@ func TestServerStartAndShutdown(t *testing.T) { p := newLocalPeer(t, s) s.register <- p - assert.True(t, s.services[0].(*fakeConsensus).started.Load()) + assert.True(t, s.services["fake"].(*fakeConsensus).started.Load()) s.Shutdown() <-ch - require.True(t, s.services[0].(*fakeConsensus).stopped.Load()) + require.True(t, s.services["fake"].(*fakeConsensus).stopped.Load()) }) } @@ -431,13 +432,13 @@ func TestConsensus(t *testing.T) { s.chain.(*fakechain.FakeChain).VerifyWitnessF = func() (int64, error) { return 0, nil } require.NoError(t, s.handleMessage(p, msg)) - require.Contains(t, s.services[0].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) + require.Contains(t, s.services["fake"].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) t.Run("small ValidUntilBlockEnd", func(t *testing.T) { t.Run("current height", func(t *testing.T) { msg := newConsensusMessage(0, s.chain.BlockHeight()) require.NoError(t, s.handleMessage(p, msg)) - require.NotContains(t, s.services[0].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) + require.NotContains(t, s.services["fake"].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) }) t.Run("invalid", func(t *testing.T) { msg := newConsensusMessage(0, s.chain.BlockHeight()-1) @@ -468,13 +469,13 @@ func TestTransaction(t *testing.T) { s.register <- p s.testHandleMessage(t, nil, CMDTX, tx) - require.Contains(t, s.services[0].(*fakeConsensus).txs, tx) + require.Contains(t, s.services["fake"].(*fakeConsensus).txs, tx) }) t.Run("bad", func(t *testing.T) { tx := newDummyTx() s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds } s.testHandleMessage(t, nil, CMDTX, tx) - require.Contains(t, s.services[0].(*fakeConsensus).txs, tx) // Consensus receives everything. + require.Contains(t, s.services["fake"].(*fakeConsensus).txs, tx) // Consensus receives everything. }) } diff --git a/pkg/rpc/rpc_config.go b/pkg/rpc/rpc_config.go index 6e241701f..cc0cdee9b 100644 --- a/pkg/rpc/rpc_config.go +++ b/pkg/rpc/rpc_config.go @@ -17,6 +17,7 @@ type ( MaxFindResultItems int `yaml:"MaxFindResultItems"` MaxNEP11Tokens int `yaml:"MaxNEP11Tokens"` Port uint16 `yaml:"Port"` + StartWhenSynchronized bool `yaml:"StartWhenSynchronized"` TLSConfig TLSConfig `yaml:"TLSConfig"` } diff --git a/pkg/rpc/server/client_test.go b/pkg/rpc/server/client_test.go index a8d9f3fb8..dc1ae0c5e 100644 --- a/pkg/rpc/server/client_test.go +++ b/pkg/rpc/server/client_test.go @@ -32,7 +32,7 @@ import ( func TestClient_NEP17(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -75,7 +75,7 @@ func TestClient_NEP17(t *testing.T) { func TestAddNetworkFeeCalculateNetworkFee(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() const extraFee = 10 var nonce uint32 @@ -324,7 +324,7 @@ func TestAddNetworkFeeCalculateNetworkFee(t *testing.T) { func TestCalculateNetworkFee(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() const extraFee = 10 c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) @@ -394,7 +394,7 @@ func TestCalculateNetworkFee(t *testing.T) { func TestSignAndPushInvocationTx(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -551,7 +551,7 @@ func TestSignAndPushInvocationTx(t *testing.T) { func TestSignAndPushP2PNotaryRequest(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChainAndServices(t, false, true) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -642,7 +642,7 @@ func TestSignAndPushP2PNotaryRequest(t *testing.T) { func TestCalculateNotaryFee(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -662,7 +662,7 @@ func TestPing(t *testing.T) { require.NoError(t, c.Init()) require.NoError(t, c.Ping()) - require.NoError(t, rpcSrv.Shutdown()) + rpcSrv.Shutdown() httpSrv.Close() require.Error(t, c.Ping()) } @@ -670,7 +670,7 @@ func TestPing(t *testing.T) { func TestCreateTxFromScript(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -699,7 +699,7 @@ func TestCreateTxFromScript(t *testing.T) { func TestCreateNEP17TransferTx(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -749,7 +749,7 @@ func TestCreateNEP17TransferTx(t *testing.T) { func TestInvokeVerify(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -791,7 +791,7 @@ func TestInvokeVerify(t *testing.T) { func TestClient_GetNativeContracts(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -805,7 +805,7 @@ func TestClient_GetNativeContracts(t *testing.T) { func TestClient_NEP11_ND(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -871,7 +871,7 @@ func TestClient_NEP11_ND(t *testing.T) { func TestClient_NEP11_D(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -938,7 +938,7 @@ func TestClient_NEP11_D(t *testing.T) { func TestClient_NNS(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -987,7 +987,7 @@ func TestClient_NNS(t *testing.T) { func TestClient_GetNotaryServiceFeePerKey(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -1002,7 +1002,7 @@ func TestClient_GetNotaryServiceFeePerKey(t *testing.T) { func TestClient_GetOraclePrice(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index bcd0ea09f..d0f3062a2 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -50,6 +50,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/vm/emit" "github.com/nspcc-dev/neo-go/pkg/vm/opcode" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -66,6 +67,8 @@ type ( log *zap.Logger https *http.Server shutdown chan struct{} + started *atomic.Bool + errChan chan error subsLock sync.RWMutex subscribers map[*subscriber]bool @@ -162,7 +165,7 @@ var upgrader = websocket.Upgrader{} // New creates a new Server struct. func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.Server, - orc *oracle.Oracle, log *zap.Logger) Server { + orc *oracle.Oracle, log *zap.Logger, errChan chan error) Server { httpServer := &http.Server{ Addr: conf.Address + ":" + strconv.FormatUint(uint64(conf.Port), 10), } @@ -188,6 +191,8 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S oracle: orc, https: tlsServer, shutdown: make(chan struct{}), + started: atomic.NewBool(false), + errChan: errChan, subscribers: make(map[*subscriber]bool), // These are NOT buffered to preserve original order of events. @@ -199,14 +204,22 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S } } -// Start creates a new JSON-RPC server listening on the configured port. It's -// supposed to be run as a separate goroutine (like http.Server's Serve) and it -// returns its errors via given errChan. -func (s *Server) Start(errChan chan error) { +// Name returns service name. +func (s *Server) Name() string { + return "rpc" +} + +// Start creates a new JSON-RPC server listening on the configured port. It creates +// goroutines needed internally and it returns its errors via errChan passed to New(). +func (s *Server) Start() { if !s.config.Enabled { s.log.Info("RPC server is not enabled") return } + if !s.started.CAS(false, true) { + s.log.Info("RPC server already started") + return + } s.Handler = http.HandlerFunc(s.handleHTTPRequest) s.log.Info("starting rpc-server", zap.String("endpoint", s.Addr)) @@ -217,20 +230,20 @@ func (s *Server) Start(errChan chan error) { go func() { ln, err := net.Listen("tcp", s.https.Addr) if err != nil { - errChan <- err + s.errChan <- err return } s.https.Addr = ln.Addr().String() err = s.https.ServeTLS(ln, cfg.CertFile, cfg.KeyFile) if err != http.ErrServerClosed { s.log.Error("failed to start TLS RPC server", zap.Error(err)) - errChan <- err + s.errChan <- err } }() } ln, err := net.Listen("tcp", s.Addr) if err != nil { - errChan <- err + s.errChan <- err return } s.Addr = ln.Addr().String() // set Addr to the actual address @@ -238,34 +251,35 @@ func (s *Server) Start(errChan chan error) { err = s.Serve(ln) if err != http.ErrServerClosed { s.log.Error("failed to start RPC server", zap.Error(err)) - errChan <- err + s.errChan <- err } }() } -// Shutdown overrides the http.Server Shutdown -// method. -func (s *Server) Shutdown() error { - var httpsErr error - +// Shutdown stops the RPC server. It can only be called once. +func (s *Server) Shutdown() { + if !s.started.Load() { + return + } // Signal to websocket writer routines and handleSubEvents. close(s.shutdown) if s.config.TLSConfig.Enabled { - s.log.Info("shutting down rpc-server (https)", zap.String("endpoint", s.https.Addr)) - httpsErr = s.https.Shutdown(context.Background()) + s.log.Info("shutting down RPC server (https)", zap.String("endpoint", s.https.Addr)) + err := s.https.Shutdown(context.Background()) + if err != nil { + s.log.Warn("error during RPC (https) server shutdown", zap.Error(err)) + } } - s.log.Info("shutting down rpc-server", zap.String("endpoint", s.Addr)) + s.log.Info("shutting down RPC server", zap.String("endpoint", s.Addr)) err := s.Server.Shutdown(context.Background()) + if err != nil { + s.log.Warn("error during RPC (http) server shutdown", zap.Error(err)) + } // Wait for handleSubEvents to finish. <-s.executionCh - - if err == nil { - return httpsErr - } - return err } func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Request) { diff --git a/pkg/rpc/server/server_helper_test.go b/pkg/rpc/server/server_helper_test.go index 0c535ca71..4ea76cc46 100644 --- a/pkg/rpc/server/server_helper_test.go +++ b/pkg/rpc/server/server_helper_test.go @@ -104,9 +104,9 @@ func initClearServerWithServices(t testing.TB, needOracle bool, needNotary bool) serverConfig.Port = 0 server, err := network.NewServer(serverConfig, chain, chain.GetStateSyncModule(), logger) require.NoError(t, err) - rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger) errCh := make(chan error, 2) - rpcServer.Start(errCh) + rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger, errCh) + rpcServer.Start() handler := http.HandlerFunc(rpcServer.handleHTTPRequest) srv := httptest.NewServer(handler) diff --git a/pkg/rpc/server/server_test.go b/pkg/rpc/server/server_test.go index 96d79ebf9..0b28ab710 100644 --- a/pkg/rpc/server/server_test.go +++ b/pkg/rpc/server/server_test.go @@ -1304,7 +1304,7 @@ func TestRPC(t *testing.T) { func TestSubmitOracle(t *testing.T) { chain, rpcSrv, httpSrv := initClearServerWithServices(t, true, false) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() rpc := `{"jsonrpc": "2.0", "id": 1, "method": "submitoracleresponse", "params": %s}` runCase := func(t *testing.T, fail bool, params ...string) func(t *testing.T) { @@ -1340,7 +1340,7 @@ func TestSubmitNotaryRequest(t *testing.T) { t.Run("disabled P2PSigExtensions", func(t *testing.T) { chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() req := fmt.Sprintf(rpc, "[]") body := doRPCCallOverHTTP(req, httpSrv.URL, t) checkErrGetResult(t, body, true) @@ -1348,7 +1348,7 @@ func TestSubmitNotaryRequest(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChainAndServices(t, false, true) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() runCase := func(t *testing.T, fail bool, params ...string) func(t *testing.T) { return func(t *testing.T) { @@ -1459,7 +1459,7 @@ func testRPCProtocol(t *testing.T, doRPCCall func(string, string, *testing.T) [] chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() e := &executor{chain: chain, httpSrv: httpSrv} t.Run("single request", func(t *testing.T) { @@ -2608,7 +2608,7 @@ func BenchmarkHandleIn(b *testing.B) { serverConfig.LogLevel = zapcore.FatalLevel server, err := network.NewServer(serverConfig, chain, chain.GetStateSyncModule(), logger) require.NoError(b, err) - rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger) + rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger, make(chan error)) defer chain.Close() do := func(b *testing.B, req []byte) { diff --git a/pkg/rpc/server/subscription_test.go b/pkg/rpc/server/subscription_test.go index 3cd0cfd98..bd90cc10c 100644 --- a/pkg/rpc/server/subscription_test.go +++ b/pkg/rpc/server/subscription_test.go @@ -94,7 +94,7 @@ func TestSubscriptions(t *testing.T) { chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() go rpcSrv.coreServer.Start(make(chan error)) defer rpcSrv.coreServer.Shutdown() @@ -261,7 +261,7 @@ func TestFilteredSubscriptions(t *testing.T) { chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() // It's used as an end-of-event-stream, so it's always present. blockSubID := callSubscribe(t, c, respMsgs, `["block_added"]`) @@ -353,7 +353,7 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) { go rpcSrv.coreServer.Start(make(chan error, 1)) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() // blocks are needed to make GAS deposit for priv0 blocks := getTestBlocks(t) @@ -395,7 +395,7 @@ func TestFilteredBlockSubscriptions(t *testing.T) { chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() blockSubID := callSubscribe(t, c, respMsgs, `["block_added", {"primary":3}]`) @@ -433,7 +433,7 @@ func TestMaxSubscriptions(t *testing.T) { chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() for i := 0; i < maxFeeds+1; i++ { var s string @@ -479,7 +479,7 @@ func TestBadSubUnsub(t *testing.T) { chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() testF := func(t *testing.T, cases map[string]string) func(t *testing.T) { return func(t *testing.T) { @@ -513,7 +513,7 @@ func doSomeWSRequest(t *testing.T, ws *websocket.Conn) { func TestWSClientsLimit(t *testing.T) { chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() dialer := websocket.Dialer{HandshakeTimeout: time.Second} url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws" @@ -552,7 +552,7 @@ func TestSubscriptionOverflow(t *testing.T) { chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() resp := callWSGetRaw(t, c, `{"jsonrpc": "2.0","method": "subscribe","params": ["block_added"],"id": 1}`, respMsgs) require.Nil(t, resp.Error) diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index f6537ac5b..66dd24e78 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -152,6 +152,11 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu }, nil } +// Name returns service name. +func (n *Notary) Name() string { + return "notary" +} + // Start runs Notary module in a separate goroutine. func (n *Notary) Start() { n.Config.Log.Info("starting notary service") diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index 5165ba5c5..a67a6f1e2 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -174,6 +174,11 @@ func NewOracle(cfg Config) (*Oracle, error) { return o, nil } +// Name returns service name. +func (o *Oracle) Name() string { + return "oracle" +} + // Shutdown shutdowns Oracle. func (o *Oracle) Shutdown() { close(o.close) diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index 8da2ae13d..e643d65bf 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -28,6 +28,7 @@ type ( // Service represents state root service. Service interface { + Name() string OnPayload(p *payload.Extensible) error AddSignature(height uint32, validatorIndex int32, sig []byte) error GetConfig() config.StateRoot diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index 8c934590f..1b8619a64 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -17,6 +17,11 @@ const ( firstVoteResendDelay = 3 * time.Second ) +// Name returns service name. +func (s *service) Name() string { + return "stateroot" +} + // Start runs service instance in a separate goroutine. func (s *service) Start() { s.log.Info("starting state validation service")