services: fix chain locked by WS subscriptions handlers

Blockchain's subscriptions, unsubscriptions and notifications are
handled by a single notificationDispatcher routine. Thus, on attempt
to send the subsequent event to Blockchain's subscribers, dispatcher
can't handle subscriptions\unsubscriptions. Make subscription and
unsubscription to be a non-blocking operation for blockchain on the
server side, otherwise it may cause the dispatcher locks.

To achieve this, use a separate lock for those code that make calls
to blockchain's subscription API and for subscription counters on
the server side.
This commit is contained in:
Anna Shaleva 2022-11-17 16:00:22 +03:00
parent e73c3c7ec4
commit b7f19a54d5

View file

@ -138,11 +138,14 @@ type (
subsLock sync.RWMutex subsLock sync.RWMutex
subscribers map[*subscriber]bool subscribers map[*subscriber]bool
subsCounterLock sync.RWMutex
blockSubs int blockSubs int
executionSubs int executionSubs int
notificationSubs int notificationSubs int
transactionSubs int transactionSubs int
notaryRequestSubs int notaryRequestSubs int
blockCh chan *block.Block blockCh chan *block.Block
executionCh chan *state.AppExecResult executionCh chan *state.AppExecResult
notificationCh chan *state.ContainedNotificationEvent notificationCh chan *state.ContainedNotificationEvent
@ -591,14 +594,17 @@ requestloop:
case resChan <- res: case resChan <- res:
} }
} }
s.subsLock.Lock() s.subsLock.Lock()
delete(s.subscribers, subscr) delete(s.subscribers, subscr)
s.subsLock.Unlock()
s.subsCounterLock.Lock()
for _, e := range subscr.feeds { for _, e := range subscr.feeds {
if e.event != neorpc.InvalidEventID { if e.event != neorpc.InvalidEventID {
s.unsubscribeFromChannel(e.event) s.unsubscribeFromChannel(e.event)
} }
} }
s.subsLock.Unlock() s.subsCounterLock.Unlock()
close(resChan) close(resChan)
ws.Close() ws.Close()
} }
@ -2441,9 +2447,9 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (interface{
} }
s.subsLock.Lock() s.subsLock.Lock()
defer s.subsLock.Unlock()
select { select {
case <-s.shutdown: case <-s.shutdown:
s.subsLock.Unlock()
return nil, neorpc.NewInternalServerError("server is shutting down") return nil, neorpc.NewInternalServerError("server is shutting down")
default: default:
} }
@ -2454,16 +2460,21 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (interface{
} }
} }
if id == len(sub.feeds) { if id == len(sub.feeds) {
s.subsLock.Unlock()
return nil, neorpc.NewInternalServerError("maximum number of subscriptions is reached") return nil, neorpc.NewInternalServerError("maximum number of subscriptions is reached")
} }
sub.feeds[id].event = event sub.feeds[id].event = event
sub.feeds[id].filter = filter sub.feeds[id].filter = filter
s.subsLock.Unlock()
s.subsCounterLock.Lock()
s.subscribeToChannel(event) s.subscribeToChannel(event)
s.subsCounterLock.Unlock()
return strconv.FormatInt(int64(id), 10), nil return strconv.FormatInt(int64(id), 10), nil
} }
// subscribeToChannel subscribes RPC server to appropriate chain events if // subscribeToChannel subscribes RPC server to appropriate chain events if
// it's not yet subscribed for them. It's supposed to be called with s.subsLock // it's not yet subscribed for them. It's supposed to be called with s.subsCounterLock
// taken by the caller. // taken by the caller.
func (s *Server) subscribeToChannel(event neorpc.EventID) { func (s *Server) subscribeToChannel(event neorpc.EventID) {
switch event { switch event {
@ -2502,20 +2513,24 @@ func (s *Server) unsubscribe(reqParams params.Params, sub *subscriber) (interfac
return nil, neorpc.ErrInvalidParams return nil, neorpc.ErrInvalidParams
} }
s.subsLock.Lock() s.subsLock.Lock()
defer s.subsLock.Unlock()
if len(sub.feeds) <= id || sub.feeds[id].event == neorpc.InvalidEventID { if len(sub.feeds) <= id || sub.feeds[id].event == neorpc.InvalidEventID {
s.subsLock.Unlock()
return nil, neorpc.ErrInvalidParams return nil, neorpc.ErrInvalidParams
} }
event := sub.feeds[id].event event := sub.feeds[id].event
sub.feeds[id].event = neorpc.InvalidEventID sub.feeds[id].event = neorpc.InvalidEventID
sub.feeds[id].filter = nil sub.feeds[id].filter = nil
s.subsLock.Unlock()
s.subsCounterLock.Lock()
s.unsubscribeFromChannel(event) s.unsubscribeFromChannel(event)
s.subsCounterLock.Unlock()
return true, nil return true, nil
} }
// unsubscribeFromChannel unsubscribes RPC server from appropriate chain events // unsubscribeFromChannel unsubscribes RPC server from appropriate chain events
// if there are no other subscribers for it. It's supposed to be called with // if there are no other subscribers for it. It must be called with s.subsConutersLock
// s.subsLock taken by the caller. // holding by the caller.
func (s *Server) unsubscribeFromChannel(event neorpc.EventID) { func (s *Server) unsubscribeFromChannel(event neorpc.EventID) {
switch event { switch event {
case neorpc.BlockEventID: case neorpc.BlockEventID:
@ -2635,6 +2650,7 @@ chloop:
// should be running concurrently to this one. And even if one is to run // should be running concurrently to this one. And even if one is to run
// after unlock, it'll see closed s.shutdown and won't subscribe. // after unlock, it'll see closed s.shutdown and won't subscribe.
s.subsLock.Lock() s.subsLock.Lock()
s.subsCounterLock.Lock()
// There might be no subscription in reality, but it's not a problem as // There might be no subscription in reality, but it's not a problem as
// core.Blockchain allows unsubscribing non-subscribed channels. // core.Blockchain allows unsubscribing non-subscribed channels.
s.chain.UnsubscribeFromBlocks(s.blockCh) s.chain.UnsubscribeFromBlocks(s.blockCh)
@ -2644,6 +2660,7 @@ chloop:
if s.chain.P2PSigExtensionsEnabled() { if s.chain.P2PSigExtensionsEnabled() {
s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh) s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh)
} }
s.subsCounterLock.Unlock()
s.subsLock.Unlock() s.subsLock.Unlock()
drainloop: drainloop:
for { for {