diff --git a/pkg/services/rpcsrv/client_test.go b/pkg/services/rpcsrv/client_test.go index 0da3964dc..023af25d0 100644 --- a/pkg/services/rpcsrv/client_test.go +++ b/pkg/services/rpcsrv/client_test.go @@ -28,6 +28,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/network" "github.com/nspcc-dev/neo-go/pkg/rpcclient" @@ -2034,10 +2035,24 @@ func TestWSClient_Wait(t *testing.T) { rcvr <- aer }() go func() { + // Wait until client is properly subscribed. The real node won't behave like this, + // but the real node has the subsequent blocks to be added that will trigger client's + // waitloops to finish anyway (and the test has only single block, thus, use it careful). require.Eventually(t, func() bool { rpcSrv.subsLock.Lock() defer rpcSrv.subsLock.Unlock() - return len(rpcSrv.subscribers) == 1 + if len(rpcSrv.subscribers) == 1 { // single client + for s := range rpcSrv.subscribers { + var count int + for _, f := range s.feeds { + if f.event != neorpc.InvalidEventID { + count++ + } + } + return count == 2 // subscription for blocks + AERs + } + } + return false }, time.Second, 100*time.Millisecond) require.NoError(t, chain.AddBlock(b)) }() @@ -2057,6 +2072,25 @@ func TestWSClient_Wait(t *testing.T) { t.Fatalf("transaction from block %d failed to be awaited: deadline exceeded", b.Index) } } + // Wait for server/client to properly unsubscribe. In real life subsequent awaiter + // requests may be run concurrently, and it's OK, but it's important for the test + // not to run subscription requests in parallel because block addition is bounded to + // the number of subscribers. + require.Eventually(t, func() bool { + rpcSrv.subsLock.Lock() + defer rpcSrv.subsLock.Unlock() + if len(rpcSrv.subscribers) != 1 { + return false + } + for s := range rpcSrv.subscribers { + for _, f := range s.feeds { + if f.event != neorpc.InvalidEventID { + return false + } + } + } + return true + }, time.Second, 100*time.Millisecond) } var faultedChecked bool @@ -2102,7 +2136,7 @@ func TestWSClient_WaitWithLateSubscription(t *testing.T) { tx := b1.Transactions[0] require.NoError(t, chain.AddBlock(b1)) - // After that, subscribe for AERs/blocks and wait. + // After that, subscribe for AERs/blocks. rcvr := make(chan *state.AppExecResult) go func() { aer, err := act.Wait(tx.Hash(), tx.ValidUntilBlock, nil) @@ -2110,8 +2144,28 @@ func TestWSClient_WaitWithLateSubscription(t *testing.T) { rcvr <- aer }() + // Wait until client is properly subscribed. The real node won't behave like this, + // but the real node has the subsequent blocks to be added that will trigger client's + // waitloops to finish anyway (and the test has only single block, thus, use it careful). + require.Eventually(t, func() bool { + rpcSrv.subsLock.Lock() + defer rpcSrv.subsLock.Unlock() + if len(rpcSrv.subscribers) == 1 { // single client + for s := range rpcSrv.subscribers { + var count int + for _, f := range s.feeds { + if f.event != neorpc.InvalidEventID { + count++ + } + } + return count == 2 // subscription for blocks + AERs + } + } + return false + }, time.Second, 100*time.Millisecond) + // Accept the next block to trigger event-based waiter loop exit and rollback to - // poll-based waiter. + // a poll-based waiter. require.NoError(t, chain.AddBlock(b2)) // Wait for the result. diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index c0b68c686..2befe85d3 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -136,18 +136,21 @@ type ( sessionsLock sync.Mutex sessions map[string]*session - subsLock sync.RWMutex - subscribers map[*subscriber]bool + subsLock sync.RWMutex + subscribers map[*subscriber]bool + + subsCounterLock sync.RWMutex blockSubs int executionSubs int notificationSubs int transactionSubs int notaryRequestSubs int - blockCh chan *block.Block - executionCh chan *state.AppExecResult - notificationCh chan *state.ContainedNotificationEvent - transactionCh chan *transaction.Transaction - notaryRequestCh chan mempoolevent.Event + + blockCh chan *block.Block + executionCh chan *state.AppExecResult + notificationCh chan *state.ContainedNotificationEvent + transactionCh chan *transaction.Transaction + notaryRequestCh chan mempoolevent.Event } // session holds a set of iterators got after invoke* call with corresponding @@ -591,14 +594,17 @@ requestloop: case resChan <- res: } } + s.subsLock.Lock() delete(s.subscribers, subscr) + s.subsLock.Unlock() + s.subsCounterLock.Lock() for _, e := range subscr.feeds { if e.event != neorpc.InvalidEventID { s.unsubscribeFromChannel(e.event) } } - s.subsLock.Unlock() + s.subsCounterLock.Unlock() close(resChan) ws.Close() } @@ -2441,12 +2447,6 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (interface{ } s.subsLock.Lock() - defer s.subsLock.Unlock() - select { - case <-s.shutdown: - return nil, neorpc.NewInternalServerError("server is shutting down") - default: - } var id int for ; id < len(sub.feeds); id++ { if sub.feeds[id].event == neorpc.InvalidEventID { @@ -2454,16 +2454,27 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (interface{ } } if id == len(sub.feeds) { + s.subsLock.Unlock() return nil, neorpc.NewInternalServerError("maximum number of subscriptions is reached") } sub.feeds[id].event = event sub.feeds[id].filter = filter + s.subsLock.Unlock() + + s.subsCounterLock.Lock() + select { + case <-s.shutdown: + s.subsCounterLock.Unlock() + return nil, neorpc.NewInternalServerError("server is shutting down") + default: + } s.subscribeToChannel(event) + s.subsCounterLock.Unlock() return strconv.FormatInt(int64(id), 10), nil } // subscribeToChannel subscribes RPC server to appropriate chain events if -// it's not yet subscribed for them. It's supposed to be called with s.subsLock +// it's not yet subscribed for them. It's supposed to be called with s.subsCounterLock // taken by the caller. func (s *Server) subscribeToChannel(event neorpc.EventID) { switch event { @@ -2502,20 +2513,24 @@ func (s *Server) unsubscribe(reqParams params.Params, sub *subscriber) (interfac return nil, neorpc.ErrInvalidParams } s.subsLock.Lock() - defer s.subsLock.Unlock() if len(sub.feeds) <= id || sub.feeds[id].event == neorpc.InvalidEventID { + s.subsLock.Unlock() return nil, neorpc.ErrInvalidParams } event := sub.feeds[id].event sub.feeds[id].event = neorpc.InvalidEventID sub.feeds[id].filter = nil + s.subsLock.Unlock() + + s.subsCounterLock.Lock() s.unsubscribeFromChannel(event) + s.subsCounterLock.Unlock() return true, nil } // unsubscribeFromChannel unsubscribes RPC server from appropriate chain events -// if there are no other subscribers for it. It's supposed to be called with -// s.subsLock taken by the caller. +// if there are no other subscribers for it. It must be called with s.subsConutersLock +// holding by the caller. func (s *Server) unsubscribeFromChannel(event neorpc.EventID) { switch event { case neorpc.BlockEventID: @@ -2631,10 +2646,10 @@ chloop: } s.subsLock.RUnlock() } - // It's important to do it with lock held because no subscription routine + // It's important to do it with subsCounterLock held because no subscription routine // should be running concurrently to this one. And even if one is to run // after unlock, it'll see closed s.shutdown and won't subscribe. - s.subsLock.Lock() + s.subsCounterLock.Lock() // There might be no subscription in reality, but it's not a problem as // core.Blockchain allows unsubscribing non-subscribed channels. s.chain.UnsubscribeFromBlocks(s.blockCh) @@ -2644,7 +2659,7 @@ chloop: if s.chain.P2PSigExtensionsEnabled() { s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh) } - s.subsLock.Unlock() + s.subsCounterLock.Unlock() drainloop: for { select {