rpc: add subscriber queue overflow check
Server-side test is added, but disabled because of its unreliability.
This commit is contained in:
parent
44ae9086b6
commit
c4c080d240
6 changed files with 111 additions and 21 deletions
|
@ -296,37 +296,49 @@ func (s *Server) handleRequest(req *request.In, sub *subscriber) response.Raw {
|
|||
|
||||
func (s *Server) handleWsWrites(ws *websocket.Conn, resChan <-chan response.Raw, subChan <-chan *websocket.PreparedMessage) {
|
||||
pingTicker := time.NewTicker(wsPingPeriod)
|
||||
defer ws.Close()
|
||||
defer pingTicker.Stop()
|
||||
eventloop:
|
||||
for {
|
||||
select {
|
||||
case <-s.shutdown:
|
||||
// Signal to the reader routine.
|
||||
ws.Close()
|
||||
return
|
||||
break eventloop
|
||||
case event, ok := <-subChan:
|
||||
if !ok {
|
||||
return
|
||||
break eventloop
|
||||
}
|
||||
ws.SetWriteDeadline(time.Now().Add(wsWriteLimit))
|
||||
if err := ws.WritePreparedMessage(event); err != nil {
|
||||
return
|
||||
break eventloop
|
||||
}
|
||||
case res, ok := <-resChan:
|
||||
if !ok {
|
||||
return
|
||||
break eventloop
|
||||
}
|
||||
ws.SetWriteDeadline(time.Now().Add(wsWriteLimit))
|
||||
if err := ws.WriteJSON(res); err != nil {
|
||||
return
|
||||
break eventloop
|
||||
}
|
||||
case <-pingTicker.C:
|
||||
ws.SetWriteDeadline(time.Now().Add(wsWriteLimit))
|
||||
if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
|
||||
return
|
||||
break eventloop
|
||||
}
|
||||
}
|
||||
}
|
||||
ws.Close()
|
||||
pingTicker.Stop()
|
||||
// Drain notification channel as there might be some goroutines blocked
|
||||
// on it.
|
||||
drainloop:
|
||||
for {
|
||||
select {
|
||||
case _, ok := <-subChan:
|
||||
if !ok {
|
||||
break drainloop
|
||||
}
|
||||
default:
|
||||
break drainloop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleWsReads(ws *websocket.Conn, resChan chan<- response.Raw, subscr *subscriber) {
|
||||
|
@ -1130,7 +1142,7 @@ func (s *Server) subscribe(reqParams request.Params, sub *subscriber) (interface
|
|||
return nil, response.ErrInvalidParams
|
||||
}
|
||||
event, err := response.GetEventIDFromString(streamName)
|
||||
if err != nil {
|
||||
if err != nil || event == response.MissedEventID {
|
||||
return nil, response.ErrInvalidParams
|
||||
}
|
||||
s.subsLock.Lock()
|
||||
|
@ -1232,6 +1244,20 @@ func (s *Server) unsubscribeFromChannel(event response.EventID) {
|
|||
}
|
||||
|
||||
func (s *Server) handleSubEvents() {
|
||||
b, err := json.Marshal(response.Notification{
|
||||
JSONRPC: request.JSONRPCVersion,
|
||||
Event: response.MissedEventID,
|
||||
Payload: make([]interface{}, 0),
|
||||
})
|
||||
if err != nil {
|
||||
s.log.Error("fatal: failed to marshal overflow event", zap.Error(err))
|
||||
return
|
||||
}
|
||||
overflowMsg, err := websocket.NewPreparedMessage(websocket.TextMessage, b)
|
||||
if err != nil {
|
||||
s.log.Error("fatal: failed to prepare overflow message", zap.Error(err))
|
||||
return
|
||||
}
|
||||
chloop:
|
||||
for {
|
||||
var resp = response.Notification{
|
||||
|
@ -1258,10 +1284,13 @@ chloop:
|
|||
s.subsLock.RLock()
|
||||
subloop:
|
||||
for sub := range s.subscribers {
|
||||
if sub.overflown.Load() {
|
||||
continue
|
||||
}
|
||||
for _, subID := range sub.feeds {
|
||||
if subID == resp.Event {
|
||||
if msg == nil {
|
||||
b, err := json.Marshal(resp)
|
||||
b, err = json.Marshal(resp)
|
||||
if err != nil {
|
||||
s.log.Error("failed to marshal notification",
|
||||
zap.Error(err),
|
||||
|
@ -1276,7 +1305,16 @@ chloop:
|
|||
break subloop
|
||||
}
|
||||
}
|
||||
sub.writer <- msg
|
||||
select {
|
||||
case sub.writer <- msg:
|
||||
default:
|
||||
sub.overflown.Store(true)
|
||||
// MissedEvent is to be delivered eventually.
|
||||
go func(sub *subscriber) {
|
||||
sub.writer <- overflowMsg
|
||||
sub.overflown.Store(false)
|
||||
}(sub)
|
||||
}
|
||||
// The message is sent only once per subscriber.
|
||||
break
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue