From 966ff28091b079eb3c0f176368a449921ef79a08 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 12 May 2020 22:38:29 +0300 Subject: [PATCH] rpc: add subscriber queue overflow check It's not practical adding server-side tests for 2.0 (as it requires generating more blocks), so we'll leave it for 3.0. --- pkg/rpc/client/wsclient.go | 14 ++++--- pkg/rpc/client/wsclient_test.go | 1 + pkg/rpc/response/events.go | 6 +++ pkg/rpc/server/server.go | 64 +++++++++++++++++++++++------ pkg/rpc/server/subscription.go | 7 ++-- pkg/rpc/server/subscription_test.go | 1 + 6 files changed, 72 insertions(+), 21 deletions(-) diff --git a/pkg/rpc/client/wsclient.go b/pkg/rpc/client/wsclient.go index 6774dd421..ff8dfce19 100644 --- a/pkg/rpc/client/wsclient.go +++ b/pkg/rpc/client/wsclient.go @@ -125,7 +125,7 @@ readloop: } var slice []json.RawMessage err = json.Unmarshal(rr.RawParams, &slice) - if err != nil || len(slice) != 1 { + if err != nil || (event != response.MissedEventID && len(slice) != 1) { // Bad event received. break } @@ -139,14 +139,18 @@ readloop: val = new(result.NotificationEvent) case response.ExecutionEventID: val = new(result.ApplicationLog) + case response.MissedEventID: + // No value. default: // Bad event received. break readloop } - err = json.Unmarshal(slice[0], val) - if err != nil || len(slice) != 1 { - // Bad event received. - break + if event != response.MissedEventID { + err = json.Unmarshal(slice[0], val) + if err != nil || len(slice) != 1 { + // Bad event received. + break + } } c.Notifications <- Notification{event, val} } else if rr.RawID != nil && (rr.Error != nil || rr.Result != nil) { diff --git a/pkg/rpc/client/wsclient_test.go b/pkg/rpc/client/wsclient_test.go index f747c1710..1fd7d68ca 100644 --- a/pkg/rpc/client/wsclient_test.go +++ b/pkg/rpc/client/wsclient_test.go @@ -108,6 +108,7 @@ func TestWSClientEvents(t *testing.T) { `{"jsonrpc":"2.0","method":"notification_from_execution","params":[{"contract":"0xc2789e5ab9bab828743833965b1df0d5fbcc206f","state":{"type":"Array","value":[{"type":"ByteArray","value":"636f6e74726163742063616c6c"},{"type":"ByteArray","value":"507574"},{"type":"Array","value":[{"type":"ByteArray","value":"746573746b6579"},{"type":"ByteArray","value":"7465737476616c7565"}]}]}}]}`, `{"jsonrpc":"2.0","method":"transaction_added","params":[{"txid":"0x93670859cc8a42f6ea994869c944879678d33d7501d388f5a446a8c7de147df7","size":60,"type":"InvocationTransaction","version":1,"attributes":[],"vin":[],"vout":[],"scripts":[],"script":"097465737476616c756507746573746b657952c103507574676f20ccfbd5f01d5b9633387428b8bab95a9e78c2"}]}`, `{"jsonrpc":"2.0","method":"block_added","params":[{"version":0,"previousblockhash":"0x33f3e0e24542b2ec3b6420e6881c31f6460a39a4e733d88f7557cbcc3b5ed560","merkleroot":"0x9d922c5cfd4c8cd1da7a6b2265061998dc438bd0dea7145192e2858155e6c57a","time":1586154525,"height":205,"nonce":1111,"next_consensus":"0xa21e4f7178607089e4fe9fab1300d1f5a3d348be","script":{"invocation":"4047a444a51218ac856f1cbc629f251c7c88187910534d6ba87847c86a9a73ed4951d203fd0a87f3e65657a7259269473896841f65c0a0c8efc79d270d917f4ff640435ee2f073c94a02f0276dfe4465037475e44e1c34c0decb87ec9c2f43edf688059fc4366a41c673d72ba772b4782c39e79f01cb981247353216d52d2df1651140527eb0dfd80a800fdd7ac8fbe68fc9366db2d71655d8ba235525a97a69a7181b1e069b82091be711c25e504a17c3c55eee6e76e6af13cb488fbe35d5c5d025c34041f39a02ebe9bb08be0e4aaa890f447dc9453209bbfb4705d8f2d869c2b55ee2d41dbec2ee476a059d77fb7c26400284328d05aece5f3168b48f1db1c6f7be0b","verification":"532102103a7f7dd016558597f7960d27c516a4394fd968b9e65155eb4b013e4040406e2102a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd622102b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc22103d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee69954ae"},"tx":[{"txid":"0xf9adfde059810f37b3d0686d67f6b29034e0c669537df7e59b40c14a0508b9ed","size":10,"type":"MinerTransaction","version":0,"attributes":[],"vin":[],"vout":[],"scripts":[]},{"txid":"0x93670859cc8a42f6ea994869c944879678d33d7501d388f5a446a8c7de147df7","size":60,"type":"InvocationTransaction","version":1,"attributes":[],"vin":[],"vout":[],"scripts":[],"script":"097465737476616c756507746573746b657952c103507574676f20ccfbd5f01d5b9633387428b8bab95a9e78c2"}]}]}`, + `{"jsonrpc":"2.0","method":"event_missed","params":[]}`, } srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { if req.URL.Path == "/ws" && req.Method == "GET" { diff --git a/pkg/rpc/response/events.go b/pkg/rpc/response/events.go index 1efba39a5..3941fcfb8 100644 --- a/pkg/rpc/response/events.go +++ b/pkg/rpc/response/events.go @@ -23,6 +23,8 @@ const ( NotificationEventID // ExecutionEventID is used for `transaction_executed` events. ExecutionEventID + // MissedEventID notifies user of missed events. + MissedEventID EventID = 255 ) // String is a good old Stringer implementation. @@ -36,6 +38,8 @@ func (e EventID) String() string { return "notification_from_execution" case ExecutionEventID: return "transaction_executed" + case MissedEventID: + return "event_missed" default: return "unknown" } @@ -52,6 +56,8 @@ func GetEventIDFromString(s string) (EventID, error) { return NotificationEventID, nil case "transaction_executed": return ExecutionEventID, nil + case "event_missed": + return MissedEventID, nil default: return 255, errors.New("invalid stream name") } diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index 2c2a2da41..ae8bee38e 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -295,37 +295,49 @@ func (s *Server) handleRequest(req *request.In, sub *subscriber) response.Raw { func (s *Server) handleWsWrites(ws *websocket.Conn, resChan <-chan response.Raw, subChan <-chan *websocket.PreparedMessage) { pingTicker := time.NewTicker(wsPingPeriod) - defer ws.Close() - defer pingTicker.Stop() +eventloop: for { select { case <-s.shutdown: - // Signal to the reader routine. - ws.Close() - return + break eventloop case event, ok := <-subChan: if !ok { - return + break eventloop } ws.SetWriteDeadline(time.Now().Add(wsWriteLimit)) if err := ws.WritePreparedMessage(event); err != nil { - return + break eventloop } case res, ok := <-resChan: if !ok { - return + break eventloop } ws.SetWriteDeadline(time.Now().Add(wsWriteLimit)) if err := ws.WriteJSON(res); err != nil { - return + break eventloop } case <-pingTicker.C: ws.SetWriteDeadline(time.Now().Add(wsWriteLimit)) if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { - return + break eventloop } } } + ws.Close() + pingTicker.Stop() + // Drain notification channel as there might be some goroutines blocked + // on it. +drainloop: + for { + select { + case _, ok := <-subChan: + if !ok { + break drainloop + } + default: + break drainloop + } + } } func (s *Server) handleWsReads(ws *websocket.Conn, resChan chan<- response.Raw, subscr *subscriber) { @@ -1131,7 +1143,7 @@ func (s *Server) subscribe(reqParams request.Params, sub *subscriber) (interface return nil, response.ErrInvalidParams } event, err := response.GetEventIDFromString(streamName) - if err != nil { + if err != nil || event == response.MissedEventID { return nil, response.ErrInvalidParams } s.subsLock.Lock() @@ -1233,6 +1245,20 @@ func (s *Server) unsubscribeFromChannel(event response.EventID) { } func (s *Server) handleSubEvents() { + b, err := json.Marshal(response.Notification{ + JSONRPC: request.JSONRPCVersion, + Event: response.MissedEventID, + Payload: make([]interface{}, 0), + }) + if err != nil { + s.log.Error("fatal: failed to marshal overflow event", zap.Error(err)) + return + } + overflowMsg, err := websocket.NewPreparedMessage(websocket.TextMessage, b) + if err != nil { + s.log.Error("fatal: failed to prepare overflow message", zap.Error(err)) + return + } chloop: for { var resp = response.Notification{ @@ -1259,10 +1285,13 @@ chloop: s.subsLock.RLock() subloop: for sub := range s.subscribers { + if sub.overflown.Load() { + continue + } for _, subID := range sub.feeds { if subID == resp.Event { if msg == nil { - b, err := json.Marshal(resp) + b, err = json.Marshal(resp) if err != nil { s.log.Error("failed to marshal notification", zap.Error(err), @@ -1277,7 +1306,16 @@ chloop: break subloop } } - sub.writer <- msg + select { + case sub.writer <- msg: + default: + sub.overflown.Store(true) + // MissedEvent is to be delivered eventually. + go func(sub *subscriber) { + sub.writer <- overflowMsg + sub.overflown.Store(false) + }(sub) + } // The message is sent only once per subscriber. break } diff --git a/pkg/rpc/server/subscription.go b/pkg/rpc/server/subscription.go index 10c9e25ec..f4c736b08 100644 --- a/pkg/rpc/server/subscription.go +++ b/pkg/rpc/server/subscription.go @@ -3,14 +3,15 @@ package server import ( "github.com/gorilla/websocket" "github.com/nspcc-dev/neo-go/pkg/rpc/response" + "go.uber.org/atomic" ) type ( // subscriber is an event subscriber. subscriber struct { - writer chan<- *websocket.PreparedMessage - ws *websocket.Conn - + writer chan<- *websocket.PreparedMessage + ws *websocket.Conn + overflown atomic.Bool // These work like slots as there is not a lot of them (it's // cheaper doing it this way rather than creating a map), // pointing to EventID is an obvious overkill at the moment, but diff --git a/pkg/rpc/server/subscription_test.go b/pkg/rpc/server/subscription_test.go index bd4fcb792..bfb28f70a 100644 --- a/pkg/rpc/server/subscription_test.go +++ b/pkg/rpc/server/subscription_test.go @@ -160,6 +160,7 @@ func TestBadSubUnsub(t *testing.T) { "no params": `{"jsonrpc": "2.0", "method": "subscribe", "params": [], "id": 1}`, "bad (non-string) event": `{"jsonrpc": "2.0", "method": "subscribe", "params": [1], "id": 1}`, "bad (wrong) event": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["block_removed"], "id": 1}`, + "missed event": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["event_missed"], "id": 1}`, } var unsubCases = map[string]string{ "no params": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": [], "id": 1}`,