From 649877d8f3058f2e003d8dde82948c211a01ed57 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 13 Apr 2023 11:48:18 +0400 Subject: [PATCH 1/4] services/rpcsrv: Test `Server` shutdown with failed precondition There is an existing problem with RPC server shutdown freeze after start failure due to some init actions (at least HTTP listen) described in #2896. Add dedicated unit test which checks that `Shutdown` returns within 5s after `Start` method encounters internal problems. Signed-off-by: Leonard Lyubich --- pkg/services/rpcsrv/server_test.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/pkg/services/rpcsrv/server_test.go b/pkg/services/rpcsrv/server_test.go index 2b0b9fb50..4bcda2473 100644 --- a/pkg/services/rpcsrv/server_test.go +++ b/pkg/services/rpcsrv/server_test.go @@ -50,6 +50,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "go.uber.org/zap/zapcore" ) @@ -3331,3 +3332,21 @@ func BenchmarkHandleIn(b *testing.B) { {"type": "Integer", "value": "42"}, {"type": "Boolean", "value": false}]]}`)) }) } + +func TestFailedPreconditionShutdown(t *testing.T) { + _, srv, _ := initClearServerWithCustomConfig(t, func(c *config.Config) { + c.ApplicationConfiguration.RPC.Addresses = []string{"not an address"} + }) + + srv.Start() + require.Positive(t, len(srv.errChan)) // this is how Start reports internal failures + + var stopped atomic.Bool + + go func() { + srv.Shutdown() + stopped.Store(true) + }() + + require.Eventually(t, stopped.Load, 5*time.Second, 100*time.Millisecond, "Shutdown should return") +} From a113940f0b05ecac15b15dc68f2410d7b8a6dc4d Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 13 Apr 2023 12:03:18 +0400 Subject: [PATCH 2/4] services/rpcsrv: Fix potential shutdown deadlock of RPC server Previously RPC server could never be shut down completely due to some start precondition failure (in particular, inability to serve HTTP on any configured endpoint). The problem was caused by next facts: * start method ran subscription routine after HTTP init succeeded only * stop method blocked waiting for the subscription routine to return Run `handleSubEvents` routine on fresh `Start` unconditionally. With this change, `Shutdown` method won't produce deadlock since `handleSubEvents` closes wait channel. Refs #2896. Signed-off-by: Leonard Lyubich --- pkg/services/rpcsrv/server.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index 88fb38942..78562bc42 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -344,6 +344,9 @@ func (s *Server) Start() { s.log.Info("RPC server already started") return } + + go s.handleSubEvents() + for _, srv := range s.http { srv.Handler = http.HandlerFunc(s.handleHTTPRequest) s.log.Info("starting rpc-server", zap.String("endpoint", srv.Addr)) @@ -363,7 +366,6 @@ func (s *Server) Start() { }(srv) } - go s.handleSubEvents() if cfg := s.config.TLSConfig; cfg.Enabled { for _, srv := range s.https { srv.Handler = http.HandlerFunc(s.handleHTTPRequest) From e126bcc462fb396c470f81ae70bf98bee7374b82 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 13 Apr 2023 12:36:39 +0400 Subject: [PATCH 3/4] services/rpcsrv: Wait for subscription process to complete when stopped Previously RPC server shutdown procedure listened to the execution channel and stopped at the first element that arrived in the queue. This could lead to the following problems: * stopper could steal the execution result from subscriber * stopper didn't wait for other subscription actions to complete Add dedicated channel to `Server` for subscription routine. Close the channel on `handleSubEvents` return and wait for signal in `Shutdown`. Signed-off-by: Leonard Lyubich --- pkg/services/rpcsrv/server.go | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index 78562bc42..116f56c29 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -148,11 +148,12 @@ type ( transactionSubs int notaryRequestSubs int - blockCh chan *block.Block - executionCh chan *state.AppExecResult - notificationCh chan *state.ContainedNotificationEvent - transactionCh chan *transaction.Transaction - notaryRequestCh chan mempoolevent.Event + blockCh chan *block.Block + executionCh chan *state.AppExecResult + notificationCh chan *state.ContainedNotificationEvent + transactionCh chan *transaction.Transaction + notaryRequestCh chan mempoolevent.Event + subEventsToExitCh chan struct{} } // session holds a set of iterators got after invoke* call with corresponding @@ -319,11 +320,12 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server, subscribers: make(map[*subscriber]bool), // These are NOT buffered to preserve original order of events. - blockCh: make(chan *block.Block), - executionCh: make(chan *state.AppExecResult), - notificationCh: make(chan *state.ContainedNotificationEvent), - transactionCh: make(chan *transaction.Transaction), - notaryRequestCh: make(chan mempoolevent.Event), + blockCh: make(chan *block.Block), + executionCh: make(chan *state.AppExecResult), + notificationCh: make(chan *state.ContainedNotificationEvent), + transactionCh: make(chan *transaction.Transaction), + notaryRequestCh: make(chan mempoolevent.Event), + subEventsToExitCh: make(chan struct{}), } } @@ -438,7 +440,7 @@ func (s *Server) Shutdown() { } // Wait for handleSubEvents to finish. - <-s.executionCh + <-s.subEventsToExitCh } // SetOracleHandler allows to update oracle handler used by the Server. @@ -2667,6 +2669,8 @@ func (s *Server) unsubscribeFromChannel(event neorpc.EventID) { } } +// handleSubEvents processes Server subscriptions until Shutdown. Upon +// completion signals to subEventCh channel. func (s *Server) handleSubEvents() { var overflowEvent = neorpc.Notification{ JSONRPC: neorpc.JSONRPCVersion, @@ -2780,12 +2784,14 @@ drainloop: } } // It's not required closing these, but since they're drained already - // this is safe and it also allows to give a signal to Shutdown routine. + // this is safe. close(s.blockCh) close(s.transactionCh) close(s.notificationCh) close(s.executionCh) close(s.notaryRequestCh) + // notify Shutdown routine + close(s.subEventsToExitCh) } func (s *Server) blockHeightFromParam(param *params.Param) (uint32, *neorpc.Error) { From e29c33e449bfca6b9481da4e9c2e5a2d40881dec Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 13 Apr 2023 12:53:58 +0400 Subject: [PATCH 4/4] cli/node: Fix deadlock produced by instant RPC service start If `StartWhenSynchronized` is unset in config, `node` command runs RPC service instantly. Previously there was a ground for deadlock. Command started RPC server synchronously. According to server implementation, it sends all internal failures to the parameterized error channel. Deadlock occured because main routine didn't scan the channel. Run `rpcsrv.Server.Start` in a separate go-routine in `startServer`. This prevents potential deadlock caused by writing into unread channel. Fixes #2896. Signed-off-by: Leonard Lyubich --- cli/server/server.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cli/server/server.go b/cli/server/server.go index bcea6b115..030cacafb 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -491,7 +491,10 @@ func startServer(ctx *cli.Context) error { go serv.Start(errChan) if !cfg.ApplicationConfiguration.RPC.StartWhenSynchronized { - rpcServer.Start() + // Run RPC server in a separate routine. This is necessary to avoid a potential + // deadlock: Start() can write errors to errChan which is not yet read in the + // current execution context (see for-loop below). + go rpcServer.Start() } sigCh := make(chan os.Signal, 1) @@ -546,7 +549,8 @@ Main: rpcServer = rpcsrv.New(chain, cfgnew.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) serv.AddService(&rpcServer) if !cfgnew.ApplicationConfiguration.RPC.StartWhenSynchronized || serv.IsInSync() { - rpcServer.Start() + // Here similar to the initial run (see above for-loop), so async. + go rpcServer.Start() } pprof.ShutDown() pprof = metrics.NewPprofService(cfgnew.ApplicationConfiguration.Pprof, log)