diff --git a/cli/server/server.go b/cli/server/server.go index e1b9c5edf..1caa4f099 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -491,7 +491,10 @@ func startServer(ctx *cli.Context) error { go serv.Start() if !cfg.ApplicationConfiguration.RPC.StartWhenSynchronized { - rpcServer.Start() + // Run RPC server in a separate routine. This is necessary to avoid a potential + // deadlock: Start() can write errors to errChan which is not yet read in the + // current execution context (see for-loop below). + go rpcServer.Start() } sigCh := make(chan os.Signal, 1) @@ -546,7 +549,8 @@ Main: rpcServer = rpcsrv.New(chain, cfgnew.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) serv.AddService(&rpcServer) if !cfgnew.ApplicationConfiguration.RPC.StartWhenSynchronized || serv.IsInSync() { - rpcServer.Start() + // Here similar to the initial run (see above for-loop), so async. + go rpcServer.Start() } pprof.ShutDown() pprof = metrics.NewPprofService(cfgnew.ApplicationConfiguration.Pprof, log) diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index 88fb38942..116f56c29 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -148,11 +148,12 @@ type ( transactionSubs int notaryRequestSubs int - blockCh chan *block.Block - executionCh chan *state.AppExecResult - notificationCh chan *state.ContainedNotificationEvent - transactionCh chan *transaction.Transaction - notaryRequestCh chan mempoolevent.Event + blockCh chan *block.Block + executionCh chan *state.AppExecResult + notificationCh chan *state.ContainedNotificationEvent + transactionCh chan *transaction.Transaction + notaryRequestCh chan mempoolevent.Event + subEventsToExitCh chan struct{} } // session holds a set of iterators got after invoke* call with corresponding @@ -319,11 +320,12 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server, subscribers: make(map[*subscriber]bool), // These are NOT buffered to preserve original order of events. - blockCh: make(chan *block.Block), - executionCh: make(chan *state.AppExecResult), - notificationCh: make(chan *state.ContainedNotificationEvent), - transactionCh: make(chan *transaction.Transaction), - notaryRequestCh: make(chan mempoolevent.Event), + blockCh: make(chan *block.Block), + executionCh: make(chan *state.AppExecResult), + notificationCh: make(chan *state.ContainedNotificationEvent), + transactionCh: make(chan *transaction.Transaction), + notaryRequestCh: make(chan mempoolevent.Event), + subEventsToExitCh: make(chan struct{}), } } @@ -344,6 +346,9 @@ func (s *Server) Start() { s.log.Info("RPC server already started") return } + + go s.handleSubEvents() + for _, srv := range s.http { srv.Handler = http.HandlerFunc(s.handleHTTPRequest) s.log.Info("starting rpc-server", zap.String("endpoint", srv.Addr)) @@ -363,7 +368,6 @@ func (s *Server) Start() { }(srv) } - go s.handleSubEvents() if cfg := s.config.TLSConfig; cfg.Enabled { for _, srv := range s.https { srv.Handler = http.HandlerFunc(s.handleHTTPRequest) @@ -436,7 +440,7 @@ func (s *Server) Shutdown() { } // Wait for handleSubEvents to finish. - <-s.executionCh + <-s.subEventsToExitCh } // SetOracleHandler allows to update oracle handler used by the Server. @@ -2665,6 +2669,8 @@ func (s *Server) unsubscribeFromChannel(event neorpc.EventID) { } } +// handleSubEvents processes Server subscriptions until Shutdown. Upon +// completion signals to subEventCh channel. func (s *Server) handleSubEvents() { var overflowEvent = neorpc.Notification{ JSONRPC: neorpc.JSONRPCVersion, @@ -2778,12 +2784,14 @@ drainloop: } } // It's not required closing these, but since they're drained already - // this is safe and it also allows to give a signal to Shutdown routine. + // this is safe. close(s.blockCh) close(s.transactionCh) close(s.notificationCh) close(s.executionCh) close(s.notaryRequestCh) + // notify Shutdown routine + close(s.subEventsToExitCh) } func (s *Server) blockHeightFromParam(param *params.Param) (uint32, *neorpc.Error) { diff --git a/pkg/services/rpcsrv/server_test.go b/pkg/services/rpcsrv/server_test.go index 2b0b9fb50..4bcda2473 100644 --- a/pkg/services/rpcsrv/server_test.go +++ b/pkg/services/rpcsrv/server_test.go @@ -50,6 +50,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "go.uber.org/zap/zapcore" ) @@ -3331,3 +3332,21 @@ func BenchmarkHandleIn(b *testing.B) { {"type": "Integer", "value": "42"}, {"type": "Boolean", "value": false}]]}`)) }) } + +func TestFailedPreconditionShutdown(t *testing.T) { + _, srv, _ := initClearServerWithCustomConfig(t, func(c *config.Config) { + c.ApplicationConfiguration.RPC.Addresses = []string{"not an address"} + }) + + srv.Start() + require.Positive(t, len(srv.errChan)) // this is how Start reports internal failures + + var stopped atomic.Bool + + go func() { + srv.Shutdown() + stopped.Store(true) + }() + + require.Eventually(t, stopped.Load, 5*time.Second, 100*time.Millisecond, "Shutdown should return") +}