diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index 3b131f890..eed794d22 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -960,6 +960,14 @@ func (c *WSClient) UnsubscribeAll() error { // after WS RPC unsubscription request is completed. Until then the subscriber channel // may still receive WS notifications. func (c *WSClient) performUnsubscription(id string) error { + c.subscriptionsLock.RLock() + rcvrWas, ok := c.subscriptions[id] + c.subscriptionsLock.RUnlock() + + if !ok { + return errors.New("no subscription with this ID") + } + var resp bool if err := c.performRequest("unsubscribe", []any{id}, &resp); err != nil { return err @@ -975,6 +983,15 @@ func (c *WSClient) performUnsubscription(id string) error { if !ok { return errors.New("no subscription with this ID") } + + cleanUpSubscriptions := true + if rcvrWas.Receiver() != rcvr.Receiver() { + // concurrent subscription has been done and been overwritten; this + // is not this routine's subscription, cleanup only receivers map + rcvr = rcvrWas + cleanUpSubscriptions = false + } + ch := rcvr.Receiver() ids := c.receivers[ch] for i, rcvrID := range ids { @@ -988,7 +1005,9 @@ func (c *WSClient) performUnsubscription(id string) error { } else { c.receivers[ch] = ids } - delete(c.subscriptions, id) + if cleanUpSubscriptions { + delete(c.subscriptions, id) + } return nil }