services/rpcsrv: Wait for subscription process to complete when stopped

Previously RPC server shutdown procedure listened to the execution
channel and stopped at the first element that arrived in the queue. This
could lead to the following problems:
 * stopper could steal the execution result from subscriber
 * stopper didn't wait for other subscription actions to complete

Add dedicated channel to `Server` for subscription routine. Close the
channel on `handleSubEvents` return and wait for signal in `Shutdown`.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
This commit is contained in:
Leonard Lyubich 2023-04-13 12:36:39 +04:00
parent a113940f0b
commit e126bcc462

View file

@ -148,11 +148,12 @@ type (
transactionSubs int
notaryRequestSubs int
blockCh chan *block.Block
executionCh chan *state.AppExecResult
notificationCh chan *state.ContainedNotificationEvent
transactionCh chan *transaction.Transaction
notaryRequestCh chan mempoolevent.Event
blockCh chan *block.Block
executionCh chan *state.AppExecResult
notificationCh chan *state.ContainedNotificationEvent
transactionCh chan *transaction.Transaction
notaryRequestCh chan mempoolevent.Event
subEventsToExitCh chan struct{}
}
// session holds a set of iterators got after invoke* call with corresponding
@ -319,11 +320,12 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server,
subscribers: make(map[*subscriber]bool),
// These are NOT buffered to preserve original order of events.
blockCh: make(chan *block.Block),
executionCh: make(chan *state.AppExecResult),
notificationCh: make(chan *state.ContainedNotificationEvent),
transactionCh: make(chan *transaction.Transaction),
notaryRequestCh: make(chan mempoolevent.Event),
blockCh: make(chan *block.Block),
executionCh: make(chan *state.AppExecResult),
notificationCh: make(chan *state.ContainedNotificationEvent),
transactionCh: make(chan *transaction.Transaction),
notaryRequestCh: make(chan mempoolevent.Event),
subEventsToExitCh: make(chan struct{}),
}
}
@ -438,7 +440,7 @@ func (s *Server) Shutdown() {
}
// Wait for handleSubEvents to finish.
<-s.executionCh
<-s.subEventsToExitCh
}
// SetOracleHandler allows to update oracle handler used by the Server.
@ -2667,6 +2669,8 @@ func (s *Server) unsubscribeFromChannel(event neorpc.EventID) {
}
}
// handleSubEvents processes Server subscriptions until Shutdown. Upon
// completion signals to subEventCh channel.
func (s *Server) handleSubEvents() {
var overflowEvent = neorpc.Notification{
JSONRPC: neorpc.JSONRPCVersion,
@ -2780,12 +2784,14 @@ drainloop:
}
}
// It's not required closing these, but since they're drained already
// this is safe and it also allows to give a signal to Shutdown routine.
// this is safe.
close(s.blockCh)
close(s.transactionCh)
close(s.notificationCh)
close(s.executionCh)
close(s.notaryRequestCh)
// notify Shutdown routine
close(s.subEventsToExitCh)
}
func (s *Server) blockHeightFromParam(param *params.Param) (uint32, *neorpc.Error) {