services/rpcsrv: Wait for subscription process to complete when stopped

Previously RPC server shutdown procedure listened to the execution
channel and stopped at the first element that arrived in the queue. This
could lead to the following problems:
 * stopper could steal the execution result from subscriber
 * stopper didn't wait for other subscription actions to complete

Add dedicated channel to `Server` for subscription routine. Close the
channel on `handleSubEvents` return and wait for signal in `Shutdown`.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
This commit is contained in:
Leonard Lyubich 2023-04-13 12:36:39 +04:00 committed by Anna Shaleva
parent f8227aa5f7
commit 465d3f43d2

View file

@ -148,11 +148,12 @@ type (
transactionSubs int transactionSubs int
notaryRequestSubs int notaryRequestSubs int
blockCh chan *block.Block blockCh chan *block.Block
executionCh chan *state.AppExecResult executionCh chan *state.AppExecResult
notificationCh chan *state.ContainedNotificationEvent notificationCh chan *state.ContainedNotificationEvent
transactionCh chan *transaction.Transaction transactionCh chan *transaction.Transaction
notaryRequestCh chan mempoolevent.Event notaryRequestCh chan mempoolevent.Event
subEventsToExitCh chan struct{}
} }
// session holds a set of iterators got after invoke* call with corresponding // session holds a set of iterators got after invoke* call with corresponding
@ -319,11 +320,12 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server,
subscribers: make(map[*subscriber]bool), subscribers: make(map[*subscriber]bool),
// These are NOT buffered to preserve original order of events. // These are NOT buffered to preserve original order of events.
blockCh: make(chan *block.Block), blockCh: make(chan *block.Block),
executionCh: make(chan *state.AppExecResult), executionCh: make(chan *state.AppExecResult),
notificationCh: make(chan *state.ContainedNotificationEvent), notificationCh: make(chan *state.ContainedNotificationEvent),
transactionCh: make(chan *transaction.Transaction), transactionCh: make(chan *transaction.Transaction),
notaryRequestCh: make(chan mempoolevent.Event), notaryRequestCh: make(chan mempoolevent.Event),
subEventsToExitCh: make(chan struct{}),
} }
} }
@ -438,7 +440,7 @@ func (s *Server) Shutdown() {
} }
// Wait for handleSubEvents to finish. // Wait for handleSubEvents to finish.
<-s.executionCh <-s.subEventsToExitCh
} }
// SetOracleHandler allows to update oracle handler used by the Server. // SetOracleHandler allows to update oracle handler used by the Server.
@ -2667,6 +2669,8 @@ func (s *Server) unsubscribeFromChannel(event neorpc.EventID) {
} }
} }
// handleSubEvents processes Server subscriptions until Shutdown. Upon
// completion signals to subEventCh channel.
func (s *Server) handleSubEvents() { func (s *Server) handleSubEvents() {
var overflowEvent = neorpc.Notification{ var overflowEvent = neorpc.Notification{
JSONRPC: neorpc.JSONRPCVersion, JSONRPC: neorpc.JSONRPCVersion,
@ -2780,12 +2784,14 @@ drainloop:
} }
} }
// It's not required closing these, but since they're drained already // It's not required closing these, but since they're drained already
// this is safe and it also allows to give a signal to Shutdown routine. // this is safe.
close(s.blockCh) close(s.blockCh)
close(s.transactionCh) close(s.transactionCh)
close(s.notificationCh) close(s.notificationCh)
close(s.executionCh) close(s.executionCh)
close(s.notaryRequestCh) close(s.notaryRequestCh)
// notify Shutdown routine
close(s.subEventsToExitCh)
} }
func (s *Server) blockHeightFromParam(param *params.Param) (uint32, *neorpc.Error) { func (s *Server) blockHeightFromParam(param *params.Param) (uint32, *neorpc.Error) {