From 465d3f43d2a52c72bfc128bfbd6e9b34c2ac7eed Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 13 Apr 2023 12:36:39 +0400 Subject: [PATCH] services/rpcsrv: Wait for subscription process to complete when stopped Previously RPC server shutdown procedure listened to the execution channel and stopped at the first element that arrived in the queue. This could lead to the following problems: * stopper could steal the execution result from subscriber * stopper didn't wait for other subscription actions to complete Add dedicated channel to `Server` for subscription routine. Close the channel on `handleSubEvents` return and wait for signal in `Shutdown`. Signed-off-by: Leonard Lyubich --- pkg/services/rpcsrv/server.go | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index c0c78ab76..0ba5f8769 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -148,11 +148,12 @@ type ( transactionSubs int notaryRequestSubs int - blockCh chan *block.Block - executionCh chan *state.AppExecResult - notificationCh chan *state.ContainedNotificationEvent - transactionCh chan *transaction.Transaction - notaryRequestCh chan mempoolevent.Event + blockCh chan *block.Block + executionCh chan *state.AppExecResult + notificationCh chan *state.ContainedNotificationEvent + transactionCh chan *transaction.Transaction + notaryRequestCh chan mempoolevent.Event + subEventsToExitCh chan struct{} } // session holds a set of iterators got after invoke* call with corresponding @@ -319,11 +320,12 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server, subscribers: make(map[*subscriber]bool), // These are NOT buffered to preserve original order of events. - blockCh: make(chan *block.Block), - executionCh: make(chan *state.AppExecResult), - notificationCh: make(chan *state.ContainedNotificationEvent), - transactionCh: make(chan *transaction.Transaction), - notaryRequestCh: make(chan mempoolevent.Event), + blockCh: make(chan *block.Block), + executionCh: make(chan *state.AppExecResult), + notificationCh: make(chan *state.ContainedNotificationEvent), + transactionCh: make(chan *transaction.Transaction), + notaryRequestCh: make(chan mempoolevent.Event), + subEventsToExitCh: make(chan struct{}), } } @@ -438,7 +440,7 @@ func (s *Server) Shutdown() { } // Wait for handleSubEvents to finish. - <-s.executionCh + <-s.subEventsToExitCh } // SetOracleHandler allows to update oracle handler used by the Server. @@ -2667,6 +2669,8 @@ func (s *Server) unsubscribeFromChannel(event neorpc.EventID) { } } +// handleSubEvents processes Server subscriptions until Shutdown. Upon +// completion signals to subEventCh channel. func (s *Server) handleSubEvents() { var overflowEvent = neorpc.Notification{ JSONRPC: neorpc.JSONRPCVersion, @@ -2780,12 +2784,14 @@ drainloop: } } // It's not required closing these, but since they're drained already - // this is safe and it also allows to give a signal to Shutdown routine. + // this is safe. close(s.blockCh) close(s.transactionCh) close(s.notificationCh) close(s.executionCh) close(s.notaryRequestCh) + // notify Shutdown routine + close(s.subEventsToExitCh) } func (s *Server) blockHeightFromParam(param *params.Param) (uint32, *neorpc.Error) {