diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index c0b68c686..f5d310861 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -136,18 +136,21 @@ type ( sessionsLock sync.Mutex sessions map[string]*session - subsLock sync.RWMutex - subscribers map[*subscriber]bool + subsLock sync.RWMutex + subscribers map[*subscriber]bool + + subsCounterLock sync.RWMutex blockSubs int executionSubs int notificationSubs int transactionSubs int notaryRequestSubs int - blockCh chan *block.Block - executionCh chan *state.AppExecResult - notificationCh chan *state.ContainedNotificationEvent - transactionCh chan *transaction.Transaction - notaryRequestCh chan mempoolevent.Event + + blockCh chan *block.Block + executionCh chan *state.AppExecResult + notificationCh chan *state.ContainedNotificationEvent + transactionCh chan *transaction.Transaction + notaryRequestCh chan mempoolevent.Event } // session holds a set of iterators got after invoke* call with corresponding @@ -591,14 +594,17 @@ requestloop: case resChan <- res: } } + s.subsLock.Lock() delete(s.subscribers, subscr) + s.subsLock.Unlock() + s.subsCounterLock.Lock() for _, e := range subscr.feeds { if e.event != neorpc.InvalidEventID { s.unsubscribeFromChannel(e.event) } } - s.subsLock.Unlock() + s.subsCounterLock.Unlock() close(resChan) ws.Close() } @@ -2441,9 +2447,9 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (interface{ } s.subsLock.Lock() - defer s.subsLock.Unlock() select { case <-s.shutdown: + s.subsLock.Unlock() return nil, neorpc.NewInternalServerError("server is shutting down") default: } @@ -2454,16 +2460,21 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (interface{ } } if id == len(sub.feeds) { + s.subsLock.Unlock() return nil, neorpc.NewInternalServerError("maximum number of subscriptions is reached") } sub.feeds[id].event = event sub.feeds[id].filter = filter + s.subsLock.Unlock() + + s.subsCounterLock.Lock() s.subscribeToChannel(event) + s.subsCounterLock.Unlock() return strconv.FormatInt(int64(id), 10), nil } // subscribeToChannel subscribes RPC server to appropriate chain events if -// it's not yet subscribed for them. It's supposed to be called with s.subsLock +// it's not yet subscribed for them. It's supposed to be called with s.subsCounterLock // taken by the caller. func (s *Server) subscribeToChannel(event neorpc.EventID) { switch event { @@ -2502,20 +2513,24 @@ func (s *Server) unsubscribe(reqParams params.Params, sub *subscriber) (interfac return nil, neorpc.ErrInvalidParams } s.subsLock.Lock() - defer s.subsLock.Unlock() if len(sub.feeds) <= id || sub.feeds[id].event == neorpc.InvalidEventID { + s.subsLock.Unlock() return nil, neorpc.ErrInvalidParams } event := sub.feeds[id].event sub.feeds[id].event = neorpc.InvalidEventID sub.feeds[id].filter = nil + s.subsLock.Unlock() + + s.subsCounterLock.Lock() s.unsubscribeFromChannel(event) + s.subsCounterLock.Unlock() return true, nil } // unsubscribeFromChannel unsubscribes RPC server from appropriate chain events -// if there are no other subscribers for it. It's supposed to be called with -// s.subsLock taken by the caller. +// if there are no other subscribers for it. It must be called with s.subsConutersLock +// holding by the caller. func (s *Server) unsubscribeFromChannel(event neorpc.EventID) { switch event { case neorpc.BlockEventID: @@ -2635,6 +2650,7 @@ chloop: // should be running concurrently to this one. And even if one is to run // after unlock, it'll see closed s.shutdown and won't subscribe. s.subsLock.Lock() + s.subsCounterLock.Lock() // There might be no subscription in reality, but it's not a problem as // core.Blockchain allows unsubscribing non-subscribed channels. s.chain.UnsubscribeFromBlocks(s.blockCh) @@ -2644,6 +2660,7 @@ chloop: if s.chain.P2PSigExtensionsEnabled() { s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh) } + s.subsCounterLock.Unlock() s.subsLock.Unlock() drainloop: for {