From dab13a4e2d8ae4debf99624aac4c56f987a9f7d5 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Fri, 21 Apr 2023 17:14:47 +0300 Subject: [PATCH] rpcclient: close WSClient subscriber on overflow Close #2894. It should be noted that the subscriber's channel is being removed from the list of receivers and closed, but it is still *in the list of subscribers* and no unsubscription is performed by WSClient. Which means that RPC server keeps sending notifications to WSClient and WSClient keeps dropping them (because there's no receiver for this subscription and it's OK, WSClient can handle this and this behaviour is documented). However, it's still the caller's duty to call Unsubscribe() method for this subscription. Signed-off-by: Anna Shaleva --- pkg/rpcclient/rpc_test.go | 2 +- pkg/rpcclient/wsclient.go | 166 +++++++++++++++++++++++++-------- pkg/rpcclient/wsclient_test.go | 82 ++++++++++++++++ 3 files changed, 211 insertions(+), 39 deletions(-) diff --git a/pkg/rpcclient/rpc_test.go b/pkg/rpcclient/rpc_test.go index 1e032c7fc..c60e77ec5 100644 --- a/pkg/rpcclient/rpc_test.go +++ b/pkg/rpcclient/rpc_test.go @@ -1803,7 +1803,7 @@ func TestRPCClients(t *testing.T) { }) t.Run("WSClient", func(t *testing.T) { testRPCClient(t, func(ctx context.Context, endpoint string, opts Options) (*Client, error) { - wsc, err := NewWS(ctx, httpURLtoWS(endpoint), WSOptions{opts}) + wsc, err := NewWS(ctx, httpURLtoWS(endpoint), WSOptions{Options: opts}) require.NoError(t, err) wsc.getNextRequestID = getTestRequestID require.NoError(t, wsc.Init()) diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index f6bb034e1..b77e03f4b 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -38,7 +38,11 @@ import ( // will make WSClient wait for the channel reader to get the event and while // it waits every other messages (subscription-related or request replies) // will be blocked. This also means that subscription channel must be properly -// drained after unsubscription. +// drained after unsubscription. If CloseNotificationChannelIfFull option is on +// then the receiver channel will be closed immediately in case if a subsequent +// notification can't be sent to it, which means WSClient's operations are +// unblocking in this mode. No unsubscription is performed in this case, so it's +// still the user responsibility to unsubscribe. // // Any received subscription items (blocks/transactions/nofitications) are passed // via pointers for efficiency, but the actual structures MUST NOT be changed, as @@ -47,7 +51,9 @@ import ( // only sent once per channel. The receiver channel will be closed by the WSClient // immediately after MissedEvent is received from the server; no unsubscription // is performed in this case, so it's the user responsibility to unsubscribe. It -// will also be closed on disconnection from server. +// will also be closed on disconnection from server or on situation when it's +// impossible to send a subsequent notification to the subscriber's channel and +// CloseNotificationChannelIfFull option is on. type WSClient struct { Client // Notifications is a channel that is used to send events received from @@ -92,6 +98,14 @@ type WSClient struct { // WSClient-specific options. See Options documentation for more details. type WSOptions struct { Options + // CloseNotificationChannelIfFull allows WSClient to close a subscriber's + // receive channel in case if the channel isn't read properly and no more + // events can be pushed to it. This option, if set, allows to avoid WSClient + // blocking on a subsequent notification dispatch. However, if enabled, the + // corresponding subscription is kept even after receiver's channel closing, + // thus it's still the caller's duty to call Unsubscribe() for this + // subscription. + CloseNotificationChannelIfFull bool } // notificationReceiver is an interface aimed to provide WS subscriber functionality @@ -102,8 +116,11 @@ type notificationReceiver interface { // Receiver returns notification receiver channel. Receiver() any // TrySend checks whether notification passes receiver filter and sends it - // to the underlying channel if so. - TrySend(ntf Notification) bool + // to the underlying channel if so. It is performed under subscriptions lock + // taken. nonBlocking denotes whether the receiving operation shouldn't block + // the client's operation. It returns whether notification matches the filter + // and whether the receiver channel is overflown. + TrySend(ntf Notification, nonBlocking bool) (bool, bool) // Close closes underlying receiver channel. Close() } @@ -133,12 +150,21 @@ func (r *blockReceiver) Receiver() any { } // TrySend implements notificationReceiver interface. -func (r *blockReceiver) TrySend(ntf Notification) bool { +func (r *blockReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) { if rpcevent.Matches(r, ntf) { - r.ch <- ntf.Value.(*block.Block) - return true + if nonBlocking { + select { + case r.ch <- ntf.Value.(*block.Block): + default: + return true, true + } + } else { + r.ch <- ntf.Value.(*block.Block) + } + + return true, false } - return false + return false, false } // Close implements notificationReceiver interface. @@ -171,12 +197,21 @@ func (r *txReceiver) Receiver() any { } // TrySend implements notificationReceiver interface. -func (r *txReceiver) TrySend(ntf Notification) bool { +func (r *txReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) { if rpcevent.Matches(r, ntf) { - r.ch <- ntf.Value.(*transaction.Transaction) - return true + if nonBlocking { + select { + case r.ch <- ntf.Value.(*transaction.Transaction): + default: + return true, true + } + } else { + r.ch <- ntf.Value.(*transaction.Transaction) + } + + return true, false } - return false + return false, false } // Close implements notificationReceiver interface. @@ -209,12 +244,21 @@ func (r *executionNotificationReceiver) Receiver() any { } // TrySend implements notificationReceiver interface. -func (r *executionNotificationReceiver) TrySend(ntf Notification) bool { +func (r *executionNotificationReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) { if rpcevent.Matches(r, ntf) { - r.ch <- ntf.Value.(*state.ContainedNotificationEvent) - return true + if nonBlocking { + select { + case r.ch <- ntf.Value.(*state.ContainedNotificationEvent): + default: + return true, true + } + } else { + r.ch <- ntf.Value.(*state.ContainedNotificationEvent) + } + + return true, false } - return false + return false, false } // Close implements notificationReceiver interface. @@ -247,12 +291,21 @@ func (r *executionReceiver) Receiver() any { } // TrySend implements notificationReceiver interface. -func (r *executionReceiver) TrySend(ntf Notification) bool { +func (r *executionReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) { if rpcevent.Matches(r, ntf) { - r.ch <- ntf.Value.(*state.AppExecResult) - return true + if nonBlocking { + select { + case r.ch <- ntf.Value.(*state.AppExecResult): + default: + return true, true + } + } else { + r.ch <- ntf.Value.(*state.AppExecResult) + } + + return true, false } - return false + return false, false } // Close implements notificationReceiver interface. @@ -285,12 +338,21 @@ func (r *notaryRequestReceiver) Receiver() any { } // TrySend implements notificationReceiver interface. -func (r *notaryRequestReceiver) TrySend(ntf Notification) bool { +func (r *notaryRequestReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) { if rpcevent.Matches(r, ntf) { - r.ch <- ntf.Value.(*result.NotaryRequestEvent) - return true + if nonBlocking { + select { + case r.ch <- ntf.Value.(*result.NotaryRequestEvent): + default: + return true, true + } + } else { + r.ch <- ntf.Value.(*result.NotaryRequestEvent) + } + + return true, false } - return false + return false, false } // Close implements notificationReceiver interface. @@ -324,12 +386,21 @@ func (r *naiveReceiver) Receiver() any { } // TrySend implements notificationReceiver interface. -func (r *naiveReceiver) TrySend(ntf Notification) bool { +func (r *naiveReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) { if rpcevent.Matches(r, ntf) { - r.ch <- ntf - return true + if nonBlocking { + select { + case r.ch <- ntf: + default: + return true, true + } + } else { + r.ch <- ntf + } + + return true, false } - return false + return false, false } // Close implements notificationReceiver interface. @@ -551,16 +622,30 @@ readloop: c.respLock.Unlock() c.subscriptionsLock.Lock() for rcvrCh, ids := range c.receivers { - rcvr := c.subscriptions[ids[0]] + c.dropSubCh(rcvrCh, ids[0], true) + } + c.subscriptionsLock.Unlock() + c.Client.ctxCancel() +} + +// dropSubCh closes corresponding subscriber's channel and removes it from the +// receivers map. If the channel belongs to a naive subscriber then it will be +// closed manually without call to Close(). The channel is still being kept in +// the subscribers map as technically the server-side subscription still exists +// and the user is responsible for unsubscription. This method must be called +// under subscriptionsLock taken. It's the caller's duty to ensure dropSubCh +// will be called once per channel, otherwise panic will occur. +func (c *WSClient) dropSubCh(rcvrCh any, id string, ignoreCloseNotificationChannelIfFull bool) { + if ignoreCloseNotificationChannelIfFull || c.wsOpts.CloseNotificationChannelIfFull { + rcvr := c.subscriptions[id] _, ok := rcvr.(*naiveReceiver) - if !ok { // naiveReceiver uses c.Notifications that is about to be closed below. - c.subscriptions[ids[0]].Close() + if ok { // naiveReceiver uses c.Notifications that should be handled separately. + close(c.Notifications) + } else { + c.subscriptions[id].Close() } delete(c.receivers, rcvrCh) } - c.subscriptionsLock.Unlock() - close(c.Notifications) - c.Client.ctxCancel() } func (c *WSClient) wsWriter() { @@ -613,15 +698,20 @@ func (c *WSClient) notifySubscribers(ntf Notification) { c.subscriptionsLock.Unlock() return } - c.subscriptionsLock.RLock() - for _, ids := range c.receivers { + c.subscriptionsLock.Lock() + for rcvrCh, ids := range c.receivers { for _, id := range ids { - if c.subscriptions[id].TrySend(ntf) { + ok, dropCh := c.subscriptions[id].TrySend(ntf, c.wsOpts.CloseNotificationChannelIfFull) + if dropCh { + c.dropSubCh(rcvrCh, id, false) + break // strictly single drop per channel + } + if ok { break // strictly one notification per channel } } } - c.subscriptionsLock.RUnlock() + c.subscriptionsLock.Unlock() } func (c *WSClient) unregisterRespChannel(id uint64) { diff --git a/pkg/rpcclient/wsclient_test.go b/pkg/rpcclient/wsclient_test.go index b72d2ec6b..ccd272e3b 100644 --- a/pkg/rpcclient/wsclient_test.go +++ b/pkg/rpcclient/wsclient_test.go @@ -289,6 +289,88 @@ func TestWSClientEvents(t *testing.T) { require.False(t, ok) } +func TestWSClientNonBlockingEvents(t *testing.T) { + // Use buffered channel as a receiver to check it will be closed by WSClient + // after overflow if CloseNotificationChannelIfFull option is enabled. + const chCap = 3 + bCh := make(chan *block.Block, chCap) + + // Events from RPC server testchain. Require events len to be larger than chCap to reach + // subscriber's chanel overflow. + var events = []string{ + fmt.Sprintf(`{"jsonrpc":"2.0","method":"block_added","params":[%s]}`, b1Verbose), + fmt.Sprintf(`{"jsonrpc":"2.0","method":"block_added","params":[%s]}`, b1Verbose), + fmt.Sprintf(`{"jsonrpc":"2.0","method":"block_added","params":[%s]}`, b1Verbose), + fmt.Sprintf(`{"jsonrpc":"2.0","method":"block_added","params":[%s]}`, b1Verbose), + fmt.Sprintf(`{"jsonrpc":"2.0","method":"block_added","params":[%s]}`, b1Verbose), + } + require.True(t, chCap < len(events)) + + var blocksSent atomic.Bool + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if req.URL.Path == "/ws" && req.Method == "GET" { + var upgrader = websocket.Upgrader{} + ws, err := upgrader.Upgrade(w, req, nil) + require.NoError(t, err) + for _, event := range events { + err = ws.SetWriteDeadline(time.Now().Add(2 * time.Second)) + require.NoError(t, err) + err = ws.WriteMessage(1, []byte(event)) + if err != nil { + break + } + } + blocksSent.Store(true) + ws.Close() + return + } + })) + wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), WSOptions{CloseNotificationChannelIfFull: true}) + require.NoError(t, err) + wsc.getNextRequestID = getTestRequestID + wsc.cacheLock.Lock() + wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually. + wsc.cache.network = netmode.UnitTestNet + wsc.cacheLock.Unlock() + + // Our server mock is restricted, so perform subscriptions manually. + wsc.subscriptionsLock.Lock() + wsc.subscriptions["0"] = &blockReceiver{ch: bCh} + wsc.subscriptions["1"] = &blockReceiver{ch: bCh} + wsc.receivers[chan<- *block.Block(bCh)] = []string{"0", "1"} + wsc.subscriptionsLock.Unlock() + + // Check that events are sent to WSClient. + require.Eventually(t, func() bool { + return blocksSent.Load() + }, time.Second, 100*time.Millisecond) + + // Check that block receiver channel was removed from the receivers list due to overflow. + require.Eventually(t, func() bool { + wsc.subscriptionsLock.RLock() + defer wsc.subscriptionsLock.RUnlock() + return len(wsc.receivers) == 0 + }, 2*time.Second, 200*time.Millisecond) + + // Check that subscriptions are still there and waiting for the call to Unsubscribe() + // to be excluded from the subscriptions map. + wsc.subscriptionsLock.RLock() + require.True(t, len(wsc.subscriptions) == 2) + wsc.subscriptionsLock.RUnlock() + + // Check that receiver was closed after overflow. + for i := 0; i < chCap; i++ { + _, ok := <-bCh + require.True(t, ok) + } + select { + case _, ok := <-bCh: + require.False(t, ok) + default: + t.Fatal("channel wasn't closed by WSClient") + } +} + func TestWSExecutionVMStateCheck(t *testing.T) { // Will answer successfully if request slips through. srv := initTestServer(t, `{"jsonrpc": "2.0", "id": 1, "result": "55aaff00"}`)