From 4eee2f930e1e207f5f7211599f27455eb8a540a9 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 22 Apr 2022 10:33:52 +0300 Subject: [PATCH 1/6] rpc/server: make double-start a no-op --- pkg/rpc/server/server.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index bcd0ea09f..07410775b 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -50,6 +50,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/vm/emit" "github.com/nspcc-dev/neo-go/pkg/vm/opcode" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -66,6 +67,7 @@ type ( log *zap.Logger https *http.Server shutdown chan struct{} + started *atomic.Bool subsLock sync.RWMutex subscribers map[*subscriber]bool @@ -188,6 +190,7 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S oracle: orc, https: tlsServer, shutdown: make(chan struct{}), + started: atomic.NewBool(false), subscribers: make(map[*subscriber]bool), // These are NOT buffered to preserve original order of events. @@ -207,6 +210,10 @@ func (s *Server) Start(errChan chan error) { s.log.Info("RPC server is not enabled") return } + if !s.started.CAS(false, true) { + s.log.Info("RPC server already started") + return + } s.Handler = http.HandlerFunc(s.handleHTTPRequest) s.log.Info("starting rpc-server", zap.String("endpoint", s.Addr)) From a10b1ad32d1855c30265d9591d66e370e13deca5 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 22 Apr 2022 10:49:06 +0300 Subject: [PATCH 2/6] rpc/server: make Server conform network.Service interface With Start() and Shutdown() taking no parameters and returning no values. --- cli/executor_test.go | 6 ++--- cli/server/server.go | 18 +++++--------- pkg/rpc/server/client_test.go | 32 ++++++++++++------------- pkg/rpc/server/server.go | 36 +++++++++++++++------------- pkg/rpc/server/server_helper_test.go | 4 ++-- pkg/rpc/server/server_test.go | 10 ++++---- pkg/rpc/server/subscription_test.go | 16 ++++++------- 7 files changed, 59 insertions(+), 63 deletions(-) diff --git a/cli/executor_test.go b/cli/executor_test.go index f4014844a..978568b09 100644 --- a/cli/executor_test.go +++ b/cli/executor_test.go @@ -151,9 +151,9 @@ func newTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch require.NoError(t, err) netSrv.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) go netSrv.Start(make(chan error, 1)) - rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger) errCh := make(chan error, 2) - rpcServer.Start(errCh) + rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger, errCh) + rpcServer.Start() return chain, &rpcServer, netSrv } @@ -187,7 +187,7 @@ func newExecutorWithConfig(t *testing.T, needChain, runChain bool, f func(*confi func (e *executor) Close(t *testing.T) { input.Terminal = nil if e.RPC != nil { - require.NoError(t, e.RPC.Shutdown()) + e.RPC.Shutdown() } if e.NetSrv != nil { e.NetSrv.Shutdown() diff --git a/cli/server/server.go b/cli/server/server.go index 334e0fec7..4700c49ae 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -504,11 +504,11 @@ func startServer(ctx *cli.Context) error { if err != nil { return cli.NewExitError(err, 1) } - rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log) errChan := make(chan error) + rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) go serv.Start(errChan) - rpcServer.Start(errChan) + rpcServer.Start() sighupCh := make(chan os.Signal, 1) signal.Notify(sighupCh, syscall.SIGHUP) @@ -528,20 +528,14 @@ Main: switch sig { case syscall.SIGHUP: log.Info("SIGHUP received, restarting rpc-server") - serverErr := rpcServer.Shutdown() - if serverErr != nil { - errChan <- fmt.Errorf("error while restarting rpc-server: %w", serverErr) - break - } - rpcServer = server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log) - rpcServer.Start(errChan) + rpcServer.Shutdown() + rpcServer = server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) + rpcServer.Start() } case <-grace.Done(): signal.Stop(sighupCh) serv.Shutdown() - if serverErr := rpcServer.Shutdown(); serverErr != nil { - shutdownErr = fmt.Errorf("error on shutdown: %w", serverErr) - } + rpcServer.Shutdown() break Main } } diff --git a/pkg/rpc/server/client_test.go b/pkg/rpc/server/client_test.go index a8d9f3fb8..dc1ae0c5e 100644 --- a/pkg/rpc/server/client_test.go +++ b/pkg/rpc/server/client_test.go @@ -32,7 +32,7 @@ import ( func TestClient_NEP17(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -75,7 +75,7 @@ func TestClient_NEP17(t *testing.T) { func TestAddNetworkFeeCalculateNetworkFee(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() const extraFee = 10 var nonce uint32 @@ -324,7 +324,7 @@ func TestAddNetworkFeeCalculateNetworkFee(t *testing.T) { func TestCalculateNetworkFee(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() const extraFee = 10 c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) @@ -394,7 +394,7 @@ func TestCalculateNetworkFee(t *testing.T) { func TestSignAndPushInvocationTx(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -551,7 +551,7 @@ func TestSignAndPushInvocationTx(t *testing.T) { func TestSignAndPushP2PNotaryRequest(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChainAndServices(t, false, true) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -642,7 +642,7 @@ func TestSignAndPushP2PNotaryRequest(t *testing.T) { func TestCalculateNotaryFee(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -662,7 +662,7 @@ func TestPing(t *testing.T) { require.NoError(t, c.Init()) require.NoError(t, c.Ping()) - require.NoError(t, rpcSrv.Shutdown()) + rpcSrv.Shutdown() httpSrv.Close() require.Error(t, c.Ping()) } @@ -670,7 +670,7 @@ func TestPing(t *testing.T) { func TestCreateTxFromScript(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -699,7 +699,7 @@ func TestCreateTxFromScript(t *testing.T) { func TestCreateNEP17TransferTx(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -749,7 +749,7 @@ func TestCreateNEP17TransferTx(t *testing.T) { func TestInvokeVerify(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -791,7 +791,7 @@ func TestInvokeVerify(t *testing.T) { func TestClient_GetNativeContracts(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -805,7 +805,7 @@ func TestClient_GetNativeContracts(t *testing.T) { func TestClient_NEP11_ND(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -871,7 +871,7 @@ func TestClient_NEP11_ND(t *testing.T) { func TestClient_NEP11_D(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -938,7 +938,7 @@ func TestClient_NEP11_D(t *testing.T) { func TestClient_NNS(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -987,7 +987,7 @@ func TestClient_NNS(t *testing.T) { func TestClient_GetNotaryServiceFeePerKey(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) @@ -1002,7 +1002,7 @@ func TestClient_GetNotaryServiceFeePerKey(t *testing.T) { func TestClient_GetOraclePrice(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() c, err := client.New(context.Background(), httpSrv.URL, client.Options{}) require.NoError(t, err) diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index 07410775b..287918ff7 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -68,6 +68,7 @@ type ( https *http.Server shutdown chan struct{} started *atomic.Bool + errChan chan error subsLock sync.RWMutex subscribers map[*subscriber]bool @@ -164,7 +165,7 @@ var upgrader = websocket.Upgrader{} // New creates a new Server struct. func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.Server, - orc *oracle.Oracle, log *zap.Logger) Server { + orc *oracle.Oracle, log *zap.Logger, errChan chan error) Server { httpServer := &http.Server{ Addr: conf.Address + ":" + strconv.FormatUint(uint64(conf.Port), 10), } @@ -191,6 +192,7 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S https: tlsServer, shutdown: make(chan struct{}), started: atomic.NewBool(false), + errChan: errChan, subscribers: make(map[*subscriber]bool), // These are NOT buffered to preserve original order of events. @@ -202,10 +204,9 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S } } -// Start creates a new JSON-RPC server listening on the configured port. It's -// supposed to be run as a separate goroutine (like http.Server's Serve) and it -// returns its errors via given errChan. -func (s *Server) Start(errChan chan error) { +// Start creates a new JSON-RPC server listening on the configured port. It creates +// goroutines needed internally and it returns its errors via errChan passed to New(). +func (s *Server) Start() { if !s.config.Enabled { s.log.Info("RPC server is not enabled") return @@ -224,20 +225,20 @@ func (s *Server) Start(errChan chan error) { go func() { ln, err := net.Listen("tcp", s.https.Addr) if err != nil { - errChan <- err + s.errChan <- err return } s.https.Addr = ln.Addr().String() err = s.https.ServeTLS(ln, cfg.CertFile, cfg.KeyFile) if err != http.ErrServerClosed { s.log.Error("failed to start TLS RPC server", zap.Error(err)) - errChan <- err + s.errChan <- err } }() } ln, err := net.Listen("tcp", s.Addr) if err != nil { - errChan <- err + s.errChan <- err return } s.Addr = ln.Addr().String() // set Addr to the actual address @@ -245,34 +246,35 @@ func (s *Server) Start(errChan chan error) { err = s.Serve(ln) if err != http.ErrServerClosed { s.log.Error("failed to start RPC server", zap.Error(err)) - errChan <- err + s.errChan <- err } }() } -// Shutdown overrides the http.Server Shutdown -// method. -func (s *Server) Shutdown() error { +// Shutdown stops the RPC server. It can only be called once. +func (s *Server) Shutdown() { var httpsErr error // Signal to websocket writer routines and handleSubEvents. close(s.shutdown) if s.config.TLSConfig.Enabled { - s.log.Info("shutting down rpc-server (https)", zap.String("endpoint", s.https.Addr)) + s.log.Info("shutting down RPC server (https)", zap.String("endpoint", s.https.Addr)) httpsErr = s.https.Shutdown(context.Background()) } - s.log.Info("shutting down rpc-server", zap.String("endpoint", s.Addr)) + s.log.Info("shutting down RPC server", zap.String("endpoint", s.Addr)) err := s.Server.Shutdown(context.Background()) // Wait for handleSubEvents to finish. <-s.executionCh - if err == nil { - return httpsErr + if httpsErr != nil { + s.errChan <- httpsErr + } + if err != nil { + s.errChan <- err } - return err } func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Request) { diff --git a/pkg/rpc/server/server_helper_test.go b/pkg/rpc/server/server_helper_test.go index 0c535ca71..4ea76cc46 100644 --- a/pkg/rpc/server/server_helper_test.go +++ b/pkg/rpc/server/server_helper_test.go @@ -104,9 +104,9 @@ func initClearServerWithServices(t testing.TB, needOracle bool, needNotary bool) serverConfig.Port = 0 server, err := network.NewServer(serverConfig, chain, chain.GetStateSyncModule(), logger) require.NoError(t, err) - rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger) errCh := make(chan error, 2) - rpcServer.Start(errCh) + rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger, errCh) + rpcServer.Start() handler := http.HandlerFunc(rpcServer.handleHTTPRequest) srv := httptest.NewServer(handler) diff --git a/pkg/rpc/server/server_test.go b/pkg/rpc/server/server_test.go index 96d79ebf9..0b28ab710 100644 --- a/pkg/rpc/server/server_test.go +++ b/pkg/rpc/server/server_test.go @@ -1304,7 +1304,7 @@ func TestRPC(t *testing.T) { func TestSubmitOracle(t *testing.T) { chain, rpcSrv, httpSrv := initClearServerWithServices(t, true, false) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() rpc := `{"jsonrpc": "2.0", "id": 1, "method": "submitoracleresponse", "params": %s}` runCase := func(t *testing.T, fail bool, params ...string) func(t *testing.T) { @@ -1340,7 +1340,7 @@ func TestSubmitNotaryRequest(t *testing.T) { t.Run("disabled P2PSigExtensions", func(t *testing.T) { chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() req := fmt.Sprintf(rpc, "[]") body := doRPCCallOverHTTP(req, httpSrv.URL, t) checkErrGetResult(t, body, true) @@ -1348,7 +1348,7 @@ func TestSubmitNotaryRequest(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChainAndServices(t, false, true) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() runCase := func(t *testing.T, fail bool, params ...string) func(t *testing.T) { return func(t *testing.T) { @@ -1459,7 +1459,7 @@ func testRPCProtocol(t *testing.T, doRPCCall func(string, string, *testing.T) [] chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() e := &executor{chain: chain, httpSrv: httpSrv} t.Run("single request", func(t *testing.T) { @@ -2608,7 +2608,7 @@ func BenchmarkHandleIn(b *testing.B) { serverConfig.LogLevel = zapcore.FatalLevel server, err := network.NewServer(serverConfig, chain, chain.GetStateSyncModule(), logger) require.NoError(b, err) - rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger) + rpcServer := New(chain, cfg.ApplicationConfiguration.RPC, server, orc, logger, make(chan error)) defer chain.Close() do := func(b *testing.B, req []byte) { diff --git a/pkg/rpc/server/subscription_test.go b/pkg/rpc/server/subscription_test.go index 3cd0cfd98..bd90cc10c 100644 --- a/pkg/rpc/server/subscription_test.go +++ b/pkg/rpc/server/subscription_test.go @@ -94,7 +94,7 @@ func TestSubscriptions(t *testing.T) { chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() go rpcSrv.coreServer.Start(make(chan error)) defer rpcSrv.coreServer.Shutdown() @@ -261,7 +261,7 @@ func TestFilteredSubscriptions(t *testing.T) { chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() // It's used as an end-of-event-stream, so it's always present. blockSubID := callSubscribe(t, c, respMsgs, `["block_added"]`) @@ -353,7 +353,7 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) { go rpcSrv.coreServer.Start(make(chan error, 1)) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() // blocks are needed to make GAS deposit for priv0 blocks := getTestBlocks(t) @@ -395,7 +395,7 @@ func TestFilteredBlockSubscriptions(t *testing.T) { chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() blockSubID := callSubscribe(t, c, respMsgs, `["block_added", {"primary":3}]`) @@ -433,7 +433,7 @@ func TestMaxSubscriptions(t *testing.T) { chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() for i := 0; i < maxFeeds+1; i++ { var s string @@ -479,7 +479,7 @@ func TestBadSubUnsub(t *testing.T) { chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() testF := func(t *testing.T, cases map[string]string) func(t *testing.T) { return func(t *testing.T) { @@ -513,7 +513,7 @@ func doSomeWSRequest(t *testing.T, ws *websocket.Conn) { func TestWSClientsLimit(t *testing.T) { chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() dialer := websocket.Dialer{HandshakeTimeout: time.Second} url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws" @@ -552,7 +552,7 @@ func TestSubscriptionOverflow(t *testing.T) { chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) defer chain.Close() - defer func() { _ = rpcSrv.Shutdown() }() + defer rpcSrv.Shutdown() resp := callWSGetRaw(t, c, `{"jsonrpc": "2.0","method": "subscribe","params": ["block_added"],"id": 1}`, respMsgs) require.Nil(t, resp.Error) From 2593bb0535da28ca508ad7948bc5634a1933c22c Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 22 Apr 2022 11:33:56 +0300 Subject: [PATCH 3/6] network: extend Service with Name, use it to distinguish services --- pkg/consensus/consensus.go | 7 +++++++ pkg/network/server.go | 6 ++++-- pkg/network/server_test.go | 17 +++++++++-------- pkg/rpc/server/server.go | 5 +++++ pkg/services/notary/notary.go | 5 +++++ pkg/services/oracle/oracle.go | 5 +++++ pkg/services/stateroot/service.go | 1 + pkg/services/stateroot/validators.go | 5 +++++ 8 files changed, 41 insertions(+), 10 deletions(-) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 585e34aa8..28c9b4336 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -63,6 +63,8 @@ type Ledger interface { // Service represents consensus instance. type Service interface { + // Name returns service name. + Name() string // Start initializes dBFT and starts event loop for consensus service. // It must be called only when sufficient amount of peers are connected. Start() @@ -256,6 +258,11 @@ func (s *service) newPrepareRequest() payload.PrepareRequest { return r } +// Name returns service name. +func (s *service) Name() string { + return "consensus" +} + func (s *service) Start() { if s.started.CAS(false, true) { s.log.Info("starting consensus service") diff --git a/pkg/network/server.go b/pkg/network/server.go index 69bce8051..b7c6bb342 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -75,6 +75,7 @@ type ( // Service is a service abstraction (oracle, state root, consensus, etc). Service interface { + Name() string Start() Shutdown() } @@ -100,7 +101,7 @@ type ( notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer - services []Service + services map[string]Service extensHandlers map[string]func(*payload.Extensible) error extensHighPrio string txCallback func(*transaction.Transaction) @@ -177,6 +178,7 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), log: log, transactions: make(chan *transaction.Transaction, 64), + services: make(map[string]Service), extensHandlers: make(map[string]func(*payload.Extensible) error), stateSync: stSync, } @@ -270,7 +272,7 @@ func (s *Server) Shutdown() { // AddService allows to add a service to be started/stopped by Server. func (s *Server) AddService(svc Service) { - s.services = append(s.services, svc) + s.services[svc.Name()] = svc } // AddExtensibleService register a service that handles extensible payload of some kind. diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 0624b157b..6c1af736f 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -38,8 +38,9 @@ type fakeConsensus struct { var _ consensus.Service = (*fakeConsensus)(nil) -func (f *fakeConsensus) Start() { f.started.Store(true) } -func (f *fakeConsensus) Shutdown() { f.stopped.Store(true) } +func (f *fakeConsensus) Name() string { return "fake" } +func (f *fakeConsensus) Start() { f.started.Store(true) } +func (f *fakeConsensus) Shutdown() { f.stopped.Store(true) } func (f *fakeConsensus) OnPayload(p *payload.Extensible) error { f.payloads = append(f.payloads, p) return nil @@ -114,12 +115,12 @@ func TestServerStartAndShutdown(t *testing.T) { p := newLocalPeer(t, s) s.register <- p - assert.True(t, s.services[0].(*fakeConsensus).started.Load()) + assert.True(t, s.services["fake"].(*fakeConsensus).started.Load()) s.Shutdown() <-ch - require.True(t, s.services[0].(*fakeConsensus).stopped.Load()) + require.True(t, s.services["fake"].(*fakeConsensus).stopped.Load()) }) } @@ -431,13 +432,13 @@ func TestConsensus(t *testing.T) { s.chain.(*fakechain.FakeChain).VerifyWitnessF = func() (int64, error) { return 0, nil } require.NoError(t, s.handleMessage(p, msg)) - require.Contains(t, s.services[0].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) + require.Contains(t, s.services["fake"].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) t.Run("small ValidUntilBlockEnd", func(t *testing.T) { t.Run("current height", func(t *testing.T) { msg := newConsensusMessage(0, s.chain.BlockHeight()) require.NoError(t, s.handleMessage(p, msg)) - require.NotContains(t, s.services[0].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) + require.NotContains(t, s.services["fake"].(*fakeConsensus).payloads, msg.Payload.(*payload.Extensible)) }) t.Run("invalid", func(t *testing.T) { msg := newConsensusMessage(0, s.chain.BlockHeight()-1) @@ -468,13 +469,13 @@ func TestTransaction(t *testing.T) { s.register <- p s.testHandleMessage(t, nil, CMDTX, tx) - require.Contains(t, s.services[0].(*fakeConsensus).txs, tx) + require.Contains(t, s.services["fake"].(*fakeConsensus).txs, tx) }) t.Run("bad", func(t *testing.T) { tx := newDummyTx() s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds } s.testHandleMessage(t, nil, CMDTX, tx) - require.Contains(t, s.services[0].(*fakeConsensus).txs, tx) // Consensus receives everything. + require.Contains(t, s.services["fake"].(*fakeConsensus).txs, tx) // Consensus receives everything. }) } diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index 287918ff7..7e83dc68d 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -204,6 +204,11 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S } } +// Name returns service name. +func (s *Server) Name() string { + return "rpc" +} + // Start creates a new JSON-RPC server listening on the configured port. It creates // goroutines needed internally and it returns its errors via errChan passed to New(). func (s *Server) Start() { diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index f6537ac5b..66dd24e78 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -152,6 +152,11 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu }, nil } +// Name returns service name. +func (n *Notary) Name() string { + return "notary" +} + // Start runs Notary module in a separate goroutine. func (n *Notary) Start() { n.Config.Log.Info("starting notary service") diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index 5165ba5c5..a67a6f1e2 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -174,6 +174,11 @@ func NewOracle(cfg Config) (*Oracle, error) { return o, nil } +// Name returns service name. +func (o *Oracle) Name() string { + return "oracle" +} + // Shutdown shutdowns Oracle. func (o *Oracle) Shutdown() { close(o.close) diff --git a/pkg/services/stateroot/service.go b/pkg/services/stateroot/service.go index 8da2ae13d..e643d65bf 100644 --- a/pkg/services/stateroot/service.go +++ b/pkg/services/stateroot/service.go @@ -28,6 +28,7 @@ type ( // Service represents state root service. Service interface { + Name() string OnPayload(p *payload.Extensible) error AddSignature(height uint32, validatorIndex int32, sig []byte) error GetConfig() config.StateRoot diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index 8c934590f..1b8619a64 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -17,6 +17,11 @@ const ( firstVoteResendDelay = 3 * time.Second ) +// Name returns service name. +func (s *service) Name() string { + return "stateroot" +} + // Start runs service instance in a separate goroutine. func (s *service) Start() { s.log.Info("starting state validation service") From 887fe0634d796868f3aa386899aa9a1ab92ba278 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 22 Apr 2022 11:47:36 +0300 Subject: [PATCH 4/6] rpc: add StartWhenSynchronized option, fix #2433 --- cli/server/server.go | 11 ++++++++--- docs/cli.md | 11 +++++++++++ docs/node-configuration.md | 5 +++++ pkg/rpc/rpc_config.go | 1 + pkg/rpc/server/server.go | 3 +++ 5 files changed, 28 insertions(+), 3 deletions(-) diff --git a/cli/server/server.go b/cli/server/server.go index 4700c49ae..1526b52f3 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -506,9 +506,12 @@ func startServer(ctx *cli.Context) error { } errChan := make(chan error) rpcServer := server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) + serv.AddService(&rpcServer) go serv.Start(errChan) - rpcServer.Start() + if !cfg.ApplicationConfiguration.RPC.StartWhenSynchronized { + rpcServer.Start() + } sighupCh := make(chan os.Signal, 1) signal.Notify(sighupCh, syscall.SIGHUP) @@ -530,12 +533,14 @@ Main: log.Info("SIGHUP received, restarting rpc-server") rpcServer.Shutdown() rpcServer = server.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) - rpcServer.Start() + serv.AddService(&rpcServer) // Replaces old one by service name. + if !cfg.ApplicationConfiguration.RPC.StartWhenSynchronized || serv.IsInSync() { + rpcServer.Start() + } } case <-grace.Done(): signal.Stop(sighupCh) serv.Shutdown() - rpcServer.Shutdown() break Main } } diff --git a/docs/cli.md b/docs/cli.md index dce0bd573..b63b586d1 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -55,6 +55,17 @@ Or specify a different network with appropriate flag like this: By default, the node will run in foreground using current standard output for logging. + +### Node synchronization + +Most of the services (state validation, oracle, consensus and RPC if +configured with `StartWhenSynchronized` option) are only started after the +node is completely synchronizaed because running them before that is either +pointless or even dangerous. The node considers itself to be fully +synchronized with the network if it has more than `MinPeers` neighbours and if +at least 2/3 of them are known to have a height less than or equal to the +current height of the node. + ### Restarting node services To restart some node services without full node restart, send the SIGHUP diff --git a/docs/node-configuration.md b/docs/node-configuration.md index 9591981ba..d8f2dc506 100644 --- a/docs/node-configuration.md +++ b/docs/node-configuration.md @@ -136,6 +136,7 @@ RPC: MaxFindResultItems: 100 MaxNEP11Tokens: 100 Port: 10332 + StartWhenSynchronized: false TLSConfig: Address: "" CertFile: serv.crt @@ -158,6 +159,10 @@ where: - `MaxNEP11Tokens` - limit for the number of tokens returned from `getnep11balances` call. - `Port` is an RPC server port it should be bound to. +- `StartWhenSynchronized` controls when RPC server will be started, by default + (`false` setting) it's started immediately and RPC is availabe during node + synchronization. Setting it to `true` will make the node start RPC service only + after full synchronization. - `TLS` section configures TLS protocol. ### State Root Configuration diff --git a/pkg/rpc/rpc_config.go b/pkg/rpc/rpc_config.go index 6e241701f..cc0cdee9b 100644 --- a/pkg/rpc/rpc_config.go +++ b/pkg/rpc/rpc_config.go @@ -17,6 +17,7 @@ type ( MaxFindResultItems int `yaml:"MaxFindResultItems"` MaxNEP11Tokens int `yaml:"MaxNEP11Tokens"` Port uint16 `yaml:"Port"` + StartWhenSynchronized bool `yaml:"StartWhenSynchronized"` TLSConfig TLSConfig `yaml:"TLSConfig"` } diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index 7e83dc68d..3bb1f23ca 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -260,6 +260,9 @@ func (s *Server) Start() { func (s *Server) Shutdown() { var httpsErr error + if !s.started.Load() { + return + } // Signal to websocket writer routines and handleSubEvents. close(s.shutdown) From 53423b7c3723e04e4c15f313f4327e7950a73e4b Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 22 Apr 2022 14:45:37 +0300 Subject: [PATCH 5/6] network: fix panic in blockqueue during shutdown panic: send on closed channel goroutine 116 [running]: github.com/nspcc-dev/neo-go/pkg/network.(*blockQueue).putBlock(0xc00011b650, 0xc01e371200) github.com/nspcc-dev/neo-go/pkg/network/blockqueue.go:129 +0x185 github.com/nspcc-dev/neo-go/pkg/network.(*Server).handleBlockCmd(0xc0002d3c00, {0xf69b7f?, 0xc001520010?}, 0xc02eb44000?) github.com/nspcc-dev/neo-go/pkg/network/server.go:607 +0x6f github.com/nspcc-dev/neo-go/pkg/network.(*Server).handleMessage(0xc0002d3c00, {0x121f4c8?, 0xc001528000?}, 0xc01e35cf80) github.com/nspcc-dev/neo-go/pkg/network/server.go:1160 +0x6c5 github.com/nspcc-dev/neo-go/pkg/network.(*TCPPeer).handleIncoming(0xc001528000) github.com/nspcc-dev/neo-go/pkg/network/tcp_peer.go:189 +0x98 created by github.com/nspcc-dev/neo-go/pkg/network.(*TCPPeer).handleConn github.com/nspcc-dev/neo-go/pkg/network/tcp_peer.go:164 +0xcf --- pkg/network/blockqueue.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/network/blockqueue.go b/pkg/network/blockqueue.go index f3437d912..992ea54c8 100644 --- a/pkg/network/blockqueue.go +++ b/pkg/network/blockqueue.go @@ -105,10 +105,13 @@ func (bq *blockQueue) run() { func (bq *blockQueue) putBlock(block *block.Block) error { h := bq.chain.BlockHeight() bq.queueLock.Lock() + defer bq.queueLock.Unlock() + if bq.discarded.Load() { + return nil + } if block.Index <= h || h+blockCacheSize < block.Index { // can easily happen when fetching the same blocks from // different peers, thus not considered as error - bq.queueLock.Unlock() return nil } pos := indexToPosition(block.Index) @@ -122,7 +125,6 @@ func (bq *blockQueue) putBlock(block *block.Block) error { } } l := bq.len - bq.queueLock.Unlock() // update metrics updateBlockQueueLenMetric(l) select { @@ -142,8 +144,8 @@ func (bq *blockQueue) lastQueued() uint32 { func (bq *blockQueue) discard() { if bq.discarded.CAS(false, true) { - close(bq.checkBlocks) bq.queueLock.Lock() + close(bq.checkBlocks) // Technically we could bq.queue = nil, but this would cost // another if in run(). for i := 0; i < len(bq.queue); i++ { From a2126b92e122889bc0dd813f9bc5ea782ece9152 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 26 Apr 2022 00:13:13 +0300 Subject: [PATCH 6/6] rpc/server: only log errors during Shutdown Sending them down the errChan is not really helpful and it can lead to deadlock. If an error happens during node shutdown, we're exiting anyway, if it happens during service restart, the old server will be dead irrespective of this error (if this affects new one in any way we'll know it soon). --- pkg/rpc/server/server.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index 3bb1f23ca..d0f3062a2 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -258,8 +258,6 @@ func (s *Server) Start() { // Shutdown stops the RPC server. It can only be called once. func (s *Server) Shutdown() { - var httpsErr error - if !s.started.Load() { return } @@ -268,21 +266,20 @@ func (s *Server) Shutdown() { if s.config.TLSConfig.Enabled { s.log.Info("shutting down RPC server (https)", zap.String("endpoint", s.https.Addr)) - httpsErr = s.https.Shutdown(context.Background()) + err := s.https.Shutdown(context.Background()) + if err != nil { + s.log.Warn("error during RPC (https) server shutdown", zap.Error(err)) + } } s.log.Info("shutting down RPC server", zap.String("endpoint", s.Addr)) err := s.Server.Shutdown(context.Background()) + if err != nil { + s.log.Warn("error during RPC (http) server shutdown", zap.Error(err)) + } // Wait for handleSubEvents to finish. <-s.executionCh - - if httpsErr != nil { - s.errChan <- httpsErr - } - if err != nil { - s.errChan <- err - } } func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Request) {