From d6eaf6efc2fb880c294e2fc89a87f265ef9c877f Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 24 Jul 2024 17:15:50 +0300 Subject: [PATCH] rpcclient/WS: fix data race on concurrent (un)subscription Every client's (Un)Subscription call does two things: an RPC call and a subscription map lock (two of maps currently). If we imagine that there is one routine that tries to subscribe (A) and one routine that tries to unsubscribe (B), the following sequence can happen: 0. Current number of subscriptions is X 1. B does an RPC and makes number of subscriptions X-1 2. A does an RPC and makes number of subscriptions X again 3. A holds subscription locks and rewrites client's subscription state (subscription with ID X now points to a different channel; channel that was registered by B is lost and is not related to any real subscription but is still included in the `receivers` map) 4. B holds subscription locks and drops subscription X (first, it is an error and we have just lost a subscription that we think was made successfully second, we have lost a channel in the `receivers` map, and no corresponding subscription points to it) 5. X subscription is received by the WS client (in practice it is a new block, 100ms, quite often to be sure this issue happens every hour), we range through the receivers, see no corresponding subscription, and panic. Closes #3093. Signed-off-by: Pavel Karpy --- pkg/rpcclient/wsclient.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index 3b131f890..eed794d22 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -960,6 +960,14 @@ func (c *WSClient) UnsubscribeAll() error { // after WS RPC unsubscription request is completed. Until then the subscriber channel // may still receive WS notifications. func (c *WSClient) performUnsubscription(id string) error { + c.subscriptionsLock.RLock() + rcvrWas, ok := c.subscriptions[id] + c.subscriptionsLock.RUnlock() + + if !ok { + return errors.New("no subscription with this ID") + } + var resp bool if err := c.performRequest("unsubscribe", []any{id}, &resp); err != nil { return err @@ -975,6 +983,15 @@ func (c *WSClient) performUnsubscription(id string) error { if !ok { return errors.New("no subscription with this ID") } + + cleanUpSubscriptions := true + if rcvrWas.Receiver() != rcvr.Receiver() { + // concurrent subscription has been done and been overwritten; this + // is not this routine's subscription, cleanup only receivers map + rcvr = rcvrWas + cleanUpSubscriptions = false + } + ch := rcvr.Receiver() ids := c.receivers[ch] for i, rcvrID := range ids { @@ -988,7 +1005,9 @@ func (c *WSClient) performUnsubscription(id string) error { } else { c.receivers[ch] = ids } - delete(c.subscriptions, id) + if cleanUpSubscriptions { + delete(c.subscriptions, id) + } return nil }