rpcclient/WS: fix data race on concurrent (un)subscription

Every client's (Un)Subscription call does two things: an RPC call and a
subscription map lock (two of maps currently). If we imagine that there is
one routine that tries to subscribe (A) and one routine that tries to
unsubscribe (B), the following sequence can happen:

0. Current number of subscriptions is X
1. B does an RPC and makes number of subscriptions X-1
2. A does an RPC and makes number of subscriptions X again
3. A holds subscription locks and rewrites client's subscription state
   (subscription with ID X now points to a different channel; channel that
   was registered by B is lost and is not related to any real subscription
   but is still included in the `receivers` map)
4. B holds subscription locks and drops subscription X (first, it is an
   error and we have just lost a subscription that we think was made
   successfully second, we have lost a channel in the `receivers` map, and
   no corresponding subscription points to it)
5. X subscription is received by the WS client (in practice it is a new
   block, 100ms, quite often to be sure this issue happens every hour), we
   range through the receivers, see no corresponding subscription, and
   panic.

Closes #3093.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2024-07-24 17:15:50 +03:00
parent 4ff2063539
commit d6eaf6efc2

View file

@ -960,6 +960,14 @@ func (c *WSClient) UnsubscribeAll() error {
// after WS RPC unsubscription request is completed. Until then the subscriber channel
// may still receive WS notifications.
func (c *WSClient) performUnsubscription(id string) error {
c.subscriptionsLock.RLock()
rcvrWas, ok := c.subscriptions[id]
c.subscriptionsLock.RUnlock()
if !ok {
return errors.New("no subscription with this ID")
}
var resp bool
if err := c.performRequest("unsubscribe", []any{id}, &resp); err != nil {
return err
@ -975,6 +983,15 @@ func (c *WSClient) performUnsubscription(id string) error {
if !ok {
return errors.New("no subscription with this ID")
}
cleanUpSubscriptions := true
if rcvrWas.Receiver() != rcvr.Receiver() {
// concurrent subscription has been done and been overwritten; this
// is not this routine's subscription, cleanup only receivers map
rcvr = rcvrWas
cleanUpSubscriptions = false
}
ch := rcvr.Receiver()
ids := c.receivers[ch]
for i, rcvrID := range ids {
@ -988,7 +1005,9 @@ func (c *WSClient) performUnsubscription(id string) error {
} else {
c.receivers[ch] = ids
}
if cleanUpSubscriptions {
delete(c.subscriptions, id)
}
return nil
}