rpc: add subscriber queue overflow check

It's not practical adding server-side tests for 2.0 (as it requires generating
more blocks), so we'll leave it for 3.0.
This commit is contained in:
Roman Khimov 2020-05-12 22:38:29 +03:00
parent 1d5734c882
commit 966ff28091
6 changed files with 72 additions and 21 deletions

View file

@ -295,37 +295,49 @@ func (s *Server) handleRequest(req *request.In, sub *subscriber) response.Raw {
func (s *Server) handleWsWrites(ws *websocket.Conn, resChan <-chan response.Raw, subChan <-chan *websocket.PreparedMessage) {
pingTicker := time.NewTicker(wsPingPeriod)
defer ws.Close()
defer pingTicker.Stop()
eventloop:
for {
select {
case <-s.shutdown:
// Signal to the reader routine.
ws.Close()
return
break eventloop
case event, ok := <-subChan:
if !ok {
return
break eventloop
}
ws.SetWriteDeadline(time.Now().Add(wsWriteLimit))
if err := ws.WritePreparedMessage(event); err != nil {
return
break eventloop
}
case res, ok := <-resChan:
if !ok {
return
break eventloop
}
ws.SetWriteDeadline(time.Now().Add(wsWriteLimit))
if err := ws.WriteJSON(res); err != nil {
return
break eventloop
}
case <-pingTicker.C:
ws.SetWriteDeadline(time.Now().Add(wsWriteLimit))
if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
return
break eventloop
}
}
}
ws.Close()
pingTicker.Stop()
// Drain notification channel as there might be some goroutines blocked
// on it.
drainloop:
for {
select {
case _, ok := <-subChan:
if !ok {
break drainloop
}
default:
break drainloop
}
}
}
func (s *Server) handleWsReads(ws *websocket.Conn, resChan chan<- response.Raw, subscr *subscriber) {
@ -1131,7 +1143,7 @@ func (s *Server) subscribe(reqParams request.Params, sub *subscriber) (interface
return nil, response.ErrInvalidParams
}
event, err := response.GetEventIDFromString(streamName)
if err != nil {
if err != nil || event == response.MissedEventID {
return nil, response.ErrInvalidParams
}
s.subsLock.Lock()
@ -1233,6 +1245,20 @@ func (s *Server) unsubscribeFromChannel(event response.EventID) {
}
func (s *Server) handleSubEvents() {
b, err := json.Marshal(response.Notification{
JSONRPC: request.JSONRPCVersion,
Event: response.MissedEventID,
Payload: make([]interface{}, 0),
})
if err != nil {
s.log.Error("fatal: failed to marshal overflow event", zap.Error(err))
return
}
overflowMsg, err := websocket.NewPreparedMessage(websocket.TextMessage, b)
if err != nil {
s.log.Error("fatal: failed to prepare overflow message", zap.Error(err))
return
}
chloop:
for {
var resp = response.Notification{
@ -1259,10 +1285,13 @@ chloop:
s.subsLock.RLock()
subloop:
for sub := range s.subscribers {
if sub.overflown.Load() {
continue
}
for _, subID := range sub.feeds {
if subID == resp.Event {
if msg == nil {
b, err := json.Marshal(resp)
b, err = json.Marshal(resp)
if err != nil {
s.log.Error("failed to marshal notification",
zap.Error(err),
@ -1277,7 +1306,16 @@ chloop:
break subloop
}
}
sub.writer <- msg
select {
case sub.writer <- msg:
default:
sub.overflown.Store(true)
// MissedEvent is to be delivered eventually.
go func(sub *subscriber) {
sub.writer <- overflowMsg
sub.overflown.Store(false)
}(sub)
}
// The message is sent only once per subscriber.
break
}