diff --git a/cli/executor_test.go b/cli/executor_test.go index f4014844a..978568b09 100644 --- a/cli/executor_test.go +++ b/cli/executor_test.go @@ -151,9 +151,9 @@ func newTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch require.NoError(t, err) netSrv.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) go netSrv.Start(make(chan error, 1)) - rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger) errCh := make(chan error, 2) - rpcServer.Start(errCh) + rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger, errCh) + rpcServer.Start() return chain, &rpcServer, netSrv } @@ -187,7 +187,7 @@ func newExecutorWithConfig(t *testing.T, needChain, runChain bool, f func(*confi func (e *executor) Close(t *testing.T) { input.Terminal = nil if e.RPC != nil { - require.NoError(t, e.RPC.Shutdown()) + e.RPC.Shutdown() } if e.NetSrv != nil { e.NetSrv.Shutdown() diff --git a/cli/server/server.go b/cli/server/server.go index 334e0fec7..4700c49ae 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -504,11 +504,11 @@ func startServer(ctx *cli.Context) error { if err != nil { return cli.NewExitError(err, 1) } - rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log) errChan := make(chan error) + rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) go serv.Start(errChan) - rpcServer.Start(errChan) + rpcServer.Start() sighupCh := make(chan os.Signal, 1) signal.Notify(sighupCh, syscall.SIGHUP) @@ -528,20 +528,14 @@ Main: switch sig { case syscall.SIGHUP: log.Info("SIGHUP received, restarting rpc-server") - serverErr := rpcServer.Shutdown() - if serverErr != nil { - errChan <- fmt.Errorf("error while restarting rpc-server: %w", serverErr) - break - } - rpcServer = server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log) - rpcServer.Start(errChan) + rpcServer.Shutdown() + rpcServer = server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) + rpcServer.Start() } case <-grace.Done(): signal.Stop(sighupCh) serv.Shutdown() - if serverErr := rpcServer.Shutdown(); serverErr != nil { - shutdownErr = fmt.Errorf("error on shutdown: %w", serverErr) - } + rpcServer.Shutdown() break Main } } diff --git a/pkg/rpc/server/client_test.go b/pkg/rpc/server/client_test.go index a8d9f3fb8..dc1ae0c5e 100644 --- a/pkg/rpc/server/client_test.go +++ b/pkg/rpc/server/client_test.go @@ -32,7 +32,7 @@ import ( func TestClient_NEP17(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -75,7 +75,7 @@ func TestClient_NEP17(t *testing.T) { func TestAddNetworkFeeCalculateNetworkFee(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() const extraFee = 10 var nonce uint32 @@ -324,7 +324,7 @@ func TestAddNetworkFeeCalculateNetworkFee(t *testing.T) { func TestCalculateNetworkFee(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() const extraFee = 10 c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) @@ -394,7 +394,7 @@ func TestCalculateNetworkFee(t *testing.T) { func TestSignAndPushInvocationTx(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -551,7 +551,7 @@ func TestSignAndPushInvocationTx(t *testing.T) { func TestSignAndPushP2PNotaryRequest(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChainAndServices(t, false, true) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -642,7 +642,7 @@ func TestSignAndPushP2PNotaryRequest(t *testing.T) { func TestCalculateNotaryFee(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -662,7 +662,7 @@ func TestPing(t *testing.T) { require.NoError(t, c.Init()) require.NoError(t, c.Ping()) - require.NoError(t, rpcSrv.Shutdown()) + rpcSrv.Shutdown() httpSrv.Close() require.Error(t, c.Ping()) } @@ -670,7 +670,7 @@ func TestPing(t *testing.T) { func TestCreateTxFromScript(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -699,7 +699,7 @@ func TestCreateTxFromScript(t *testing.T) { func TestCreateNEP17TransferTx(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -749,7 +749,7 @@ func TestCreateNEP17TransferTx(t *testing.T) { func TestInvokeVerify(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -791,7 +791,7 @@ func TestInvokeVerify(t *testing.T) { func TestClient_GetNativeContracts(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -805,7 +805,7 @@ func TestClient_GetNativeContracts(t *testing.T) { func TestClient_NEP11_ND(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -871,7 +871,7 @@ func TestClient_NEP11_ND(t *testing.T) { func TestClient_NEP11_D(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -938,7 +938,7 @@ func TestClient_NEP11_D(t *testing.T) { func TestClient_NNS(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -987,7 +987,7 @@ func TestClient_NNS(t *testing.T) { func TestClient_GetNotaryServiceFeePerKey(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -1002,7 +1002,7 @@ func TestClient_GetNotaryServiceFeePerKey(t *testing.T) { func TestClient_GetOraclePrice(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index 07410775b..287918ff7 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -68,6 +68,7 @@ type ( https *http.Server shutdown chan struct{} started *atomic.Bool + errChan chan error subsLock sync.RWMutex subscribers map[*subscriber]bool @@ -164,7 +165,7 @@ var upgrader = websocket.Upgrader{} // New creates a new Server struct. func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.Server, - orc *oracle.Oracle, log *zap.Logger) Server { + orc *oracle.Oracle, log *zap.Logger, errChan chan error) Server { httpServer := &http.Server{ Addr: conf.Address + ":" + strconv.FormatUint(uint64(conf.Port), 10), } @@ -191,6 +192,7 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S https: tlsServer, shutdown: make(chan struct{}), started: atomic.NewBool(false), + errChan: errChan, subscribers: make(map[*subscriber]bool), // These are NOT buffered to preserve original order of events. @@ -202,10 +204,9 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S } } -// Start creates a new JSON-RPC server listening on the configured port. It's -// supposed to be run as a separate goroutine (like http.Server's Serve) and it -// returns its errors via given errChan. -func (s *Server) Start(errChan chan error) { +// Start creates a new JSON-RPC server listening on the configured port. It creates +// goroutines needed internally and it returns its errors via errChan passed to New(). +func (s *Server) Start() { if !s.config.Enabled { s.log.Info("RPC server is not enabled") return @@ -224,20 +225,20 @@ func (s *Server) Start(errChan chan error) { go func() { ln, err := net.Listen("tcp", s.https.Addr) if err != nil { - errChan <- err + s.errChan <- err return } s.https.Addr = ln.Addr().String() err = s.https.ServeTLS(ln, cfg.CertFile, cfg.KeyFile) if err != http.ErrServerClosed { s.log.Error("failed to start TLS RPC server", zap.Error(err)) - errChan <- err + s.errChan <- err } }() } ln, err := net.Listen("tcp", s.Addr) if err != nil { - errChan <- err + s.errChan <- err return } s.Addr = ln.Addr().String() // set Addr to the actual address @@ -245,34 +246,35 @@ func (s *Server) Start(errChan chan error) { err = s.Serve(ln) if err != http.ErrServerClosed { s.log.Error("failed to start RPC server", zap.Error(err)) - errChan <- err + s.errChan <- err } }() } -// Shutdown overrides the http.Server Shutdown -// method. -func (s *Server) Shutdown() error { +// Shutdown stops the RPC server. It can only be called once. +func (s *Server) Shutdown() { var httpsErr error // Signal to websocket writer routines and handleSubEvents. close(s.shutdown) if s.config.TLSConfig.Enabled { - s.log.Info("shutting down rpc-server (https)", zap.String("endpoint", s.https.Addr)) + s.log.Info("shutting down RPC server (https)", zap.String("endpoint", s.https.Addr)) httpsErr = s.https.Shutdown(context.Background()) } - s.log.Info("shutting down rpc-server", zap.String("endpoint", s.Addr)) + s.log.Info("shutting down RPC server", zap.String("endpoint", s.Addr)) err := s.Server.Shutdown(context.Background()) // Wait for handleSubEvents to finish. <-s.executionCh - if err == nil { - return httpsErr + if httpsErr != nil { + s.errChan <- httpsErr + } + if err != nil { + s.errChan <- err } - return err } func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Request) { diff --git a/pkg/rpc/server/server_helper_test.go b/pkg/rpc/server/server_helper_test.go index 0c535ca71..4ea76cc46 100644 --- a/pkg/rpc/server/server_helper_test.go +++ b/pkg/rpc/server/server_helper_test.go @@ -104,9 +104,9 @@ func initClearServerWithServices(t testing.TB, needOracle bool, needNotary bool) serverConfig.Port = 0 server, err := network.NewServer(serverConfig, chain, chain.GetStateSyncModule(), logger) require.NoError(t, err) - rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger) errCh := make(chan error, 2) - rpcServer.Start(errCh) + rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger, errCh) + rpcServer.Start() handler := http.HandlerFunc(rpcServer.handleHTTPRequest) srv := httptest.NewServer(handler) diff --git a/pkg/rpc/server/server_test.go b/pkg/rpc/server/server_test.go index 96d79ebf9..0b28ab710 100644 --- a/pkg/rpc/server/server_test.go +++ b/pkg/rpc/server/server_test.go @@ -1304,7 +1304,7 @@ func TestRPC(t *testing.T) { func TestSubmitOracle(t *testing.T) { chain, rpcSrv, httpSrv := initClearServerWithServices(t, true, false) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() rpc := `{"jsonrpc": "2.0", "id": 1, "method": "submitoracleresponse", "params": %s}` runCase := func(t *testing.T, fail bool, params ...string) func(t *testing.T) { @@ -1340,7 +1340,7 @@ func TestSubmitNotaryRequest(t *testing.T) { t.Run("disabled P2PSigExtensions", func(t *testing.T) { chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() req := fmt.Sprintf(rpc, "[]") body := doRPCCallOverHTTP(req, httpSrv.URL, t) checkErrGetResult(t, body, true) @@ -1348,7 +1348,7 @@ func TestSubmitNotaryRequest(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChainAndServices(t, false, true) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() runCase := func(t *testing.T, fail bool, params ...string) func(t *testing.T) { return func(t *testing.T) { @@ -1459,7 +1459,7 @@ func testRPCProtocol(t *testing.T, doRPCCall func(string, string, *testing.T) [] chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() e := &executor{chain: chain, httpSrv: httpSrv} t.Run("single request", func(t *testing.T) { @@ -2608,7 +2608,7 @@ func BenchmarkHandleIn(b *testing.B) { serverConfig.LogLevel = zapcore.FatalLevel server, err := network.NewServer(serverConfig, chain, chain.GetStateSyncModule(), logger) require.NoError(b, err) - rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger) + rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger, make(chan error)) defer chain.Close() do := func(b *testing.B, req []byte) { diff --git a/pkg/rpc/server/subscription_test.go b/pkg/rpc/server/subscription_test.go index 3cd0cfd98..bd90cc10c 100644 --- a/pkg/rpc/server/subscription_test.go +++ b/pkg/rpc/server/subscription_test.go @@ -94,7 +94,7 @@ func TestSubscriptions(t *testing.T) { chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() go rpcSrv.coreServer.Start(make(chan error)) defer rpcSrv.coreServer.Shutdown() @@ -261,7 +261,7 @@ func TestFilteredSubscriptions(t *testing.T) { chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() // It's used as an end-of-event-stream, so it's always present. blockSubID := callSubscribe(t, c, respMsgs, `["block_added"]`) @@ -353,7 +353,7 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) { go rpcSrv.coreServer.Start(make(chan error, 1)) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() // blocks are needed to make GAS deposit for priv0 blocks := getTestBlocks(t) @@ -395,7 +395,7 @@ func TestFilteredBlockSubscriptions(t *testing.T) { chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() blockSubID := callSubscribe(t, c, respMsgs, `["block_added", {"primary":3}]`) @@ -433,7 +433,7 @@ func TestMaxSubscriptions(t *testing.T) { chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() for i := 0; i < maxFeeds+1; i++ { var s string @@ -479,7 +479,7 @@ func TestBadSubUnsub(t *testing.T) { chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() testF := func(t *testing.T, cases map[string]string) func(t *testing.T) { return func(t *testing.T) { @@ -513,7 +513,7 @@ func doSomeWSRequest(t *testing.T, ws *websocket.Conn) { func TestWSClientsLimit(t *testing.T) { chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() dialer := websocket.Dialer{HandshakeTimeout: time.Second} url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws" @@ -552,7 +552,7 @@ func TestSubscriptionOverflow(t *testing.T) { chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() resp := callWSGetRaw(t, c, `{"jsonrpc": "2.0","method": "subscribe","params": ["block_added"],"id": 1}`, respMsgs) require.Nil(t, resp.Error)