diff --git a/pkg/services/rpcsrv/subscription_test.go b/pkg/services/rpcsrv/subscription_test.go index eed19a3ad..fbc35bac1 100644 --- a/pkg/services/rpcsrv/subscription_test.go +++ b/pkg/services/rpcsrv/subscription_test.go @@ -20,8 +20,8 @@ import ( const testOverflow = false -func wsReader(t *testing.T, ws *websocket.Conn, msgCh chan<- []byte, isFinished *atomic.Bool) { - for { +func wsReader(t *testing.T, ws *websocket.Conn, msgCh chan<- []byte, isFinished *atomic.Bool, readerToExitCh chan struct{}) { + for !isFinished.Load() { err := ws.SetReadDeadline(time.Now().Add(time.Second)) if isFinished.Load() { require.Error(t, err) @@ -36,6 +36,7 @@ func wsReader(t *testing.T, ws *websocket.Conn, msgCh chan<- []byte, isFinished require.NoError(t, err) msgCh <- body } + close(readerToExitCh) } func callWSGetRaw(t *testing.T, ws *websocket.Conn, msg string, respCh <-chan []byte) *neorpc.Response { @@ -56,7 +57,7 @@ func getNotification(t *testing.T, respCh <-chan []byte) *neorpc.Notification { return resp } -func initCleanServerAndWSClient(t *testing.T) (*core.Blockchain, *Server, *websocket.Conn, chan []byte, *atomic.Bool) { +func initCleanServerAndWSClient(t *testing.T, startNetworkServer ...bool) (*core.Blockchain, *Server, *websocket.Conn, chan []byte) { chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t) dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second} @@ -69,8 +70,20 @@ func initCleanServerAndWSClient(t *testing.T) (*core.Blockchain, *Server, *webso // responses from it. respMsgs := make(chan []byte, 16) finishedFlag := &atomic.Bool{} - go wsReader(t, ws, respMsgs, finishedFlag) - return chain, rpcSrv, ws, respMsgs, finishedFlag + readerToExitCh := make(chan struct{}) + go wsReader(t, ws, respMsgs, finishedFlag, readerToExitCh) + if len(startNetworkServer) != 0 && startNetworkServer[0] { + rpcSrv.coreServer.Start() + } + t.Cleanup(func() { + finishedFlag.Store(true) + <-readerToExitCh + ws.Close() + if len(startNetworkServer) != 0 && startNetworkServer[0] { + rpcSrv.coreServer.Shutdown() + } + }) + return chain, rpcSrv, ws, respMsgs } func callSubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, params string) string { @@ -95,10 +108,7 @@ func TestSubscriptions(t *testing.T) { var subIDs = make([]string, 0) var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event", "header_of_added_block"} - chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) - - rpcSrv.coreServer.Start() - defer rpcSrv.coreServer.Shutdown() + chain, rpcSrv, c, respMsgs := initCleanServerAndWSClient(t, true) for _, feed := range subFeeds { s := callSubscribe(t, c, respMsgs, fmt.Sprintf(`["%s"]`, feed)) @@ -156,8 +166,6 @@ func TestSubscriptions(t *testing.T) { for _, id := range subIDs { callUnsubscribe(t, c, respMsgs, id) } - finishedFlag.CompareAndSwap(false, true) - c.Close() } func TestFilteredSubscriptions(t *testing.T) { @@ -293,7 +301,7 @@ func TestFilteredSubscriptions(t *testing.T) { for name, this := range cases { t.Run(name, func(t *testing.T) { - chain, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + chain, _, c, respMsgs := initCleanServerAndWSClient(t) // It's used as an end-of-event-stream, so it's always present. blockSubID := callSubscribe(t, c, respMsgs, `["block_added"]`) @@ -320,8 +328,6 @@ func TestFilteredSubscriptions(t *testing.T) { callUnsubscribe(t, c, respMsgs, subID) callUnsubscribe(t, c, respMsgs, blockSubID) - finishedFlag.CompareAndSwap(false, true) - c.Close() }) } } @@ -389,8 +395,7 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) { }, } - chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) - rpcSrv.coreServer.Start() + chain, rpcSrv, c, respMsgs := initCleanServerAndWSClient(t, true) // blocks are needed to make GAS deposit for priv0 blocks := getTestBlocks(t) @@ -421,15 +426,13 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) { callUnsubscribe(t, c, respMsgs, subID) }) } - finishedFlag.CompareAndSwap(false, true) - c.Close() } func TestFilteredBlockSubscriptions(t *testing.T) { // We can't fit this into TestFilteredSubscriptions, because it uses // blocks as EOF events to wait for. const numBlocks = 10 - chain, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + chain, _, c, respMsgs := initCleanServerAndWSClient(t) blockSubID := callSubscribe(t, c, respMsgs, `["block_added", {"primary":3}]`) @@ -458,13 +461,11 @@ func TestFilteredBlockSubscriptions(t *testing.T) { require.Equal(t, 3, int(primary)) } callUnsubscribe(t, c, respMsgs, blockSubID) - finishedFlag.CompareAndSwap(false, true) - c.Close() } func TestHeaderOfAddedBlockSubscriptions(t *testing.T) { const numBlocks = 10 - chain, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + chain, _, c, respMsgs := initCleanServerAndWSClient(t) headerSubID := callSubscribe(t, c, respMsgs, `["header_of_added_block", {"primary":3}]`) @@ -493,13 +494,11 @@ func TestHeaderOfAddedBlockSubscriptions(t *testing.T) { require.Equal(t, 3, int(primary)) } callUnsubscribe(t, c, respMsgs, headerSubID) - finishedFlag.CompareAndSwap(false, true) - c.Close() } func TestMaxSubscriptions(t *testing.T) { var subIDs = make([]string, 0) - _, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + _, _, c, respMsgs := initCleanServerAndWSClient(t) for i := 0; i < maxFeeds+1; i++ { var s string @@ -518,9 +517,6 @@ func TestMaxSubscriptions(t *testing.T) { require.Nil(t, resp.Result) } } - - finishedFlag.CompareAndSwap(false, true) - c.Close() } func TestBadSubUnsub(t *testing.T) { @@ -542,7 +538,7 @@ func TestBadSubUnsub(t *testing.T) { "bad id": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": ["vasiliy"], "id": 1}`, "not subscribed id": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": ["7"], "id": 1}`, } - _, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + _, _, c, respMsgs := initCleanServerAndWSClient(t) testF := func(t *testing.T, cases map[string]string) func(t *testing.T) { return func(t *testing.T) { @@ -557,9 +553,6 @@ func TestBadSubUnsub(t *testing.T) { } t.Run("subscribe", testF(t, subCases)) t.Run("unsubscribe", testF(t, unsubCases)) - - finishedFlag.CompareAndSwap(false, true) - c.Close() } func doSomeWSRequest(t *testing.T, ws *websocket.Conn) { @@ -625,7 +618,7 @@ func TestSubscriptionOverflow(t *testing.T) { const blockCnt = notificationBufSize * 5 var receivedMiss bool - chain, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + chain, _, c, respMsgs := initCleanServerAndWSClient(t) resp := callWSGetRaw(t, c, `{"jsonrpc": "2.0","method": "subscribe","params": ["block_added"],"id": 1}`, respMsgs) require.Nil(t, resp.Error) @@ -647,9 +640,6 @@ func TestSubscriptionOverflow(t *testing.T) { require.Equal(t, true, receivedMiss) // `Missed` is the last event and there is nothing afterwards. require.Equal(t, 0, len(respMsgs)) - - finishedFlag.CompareAndSwap(false, true) - c.Close() } func TestFilteredSubscriptions_InvalidFilter(t *testing.T) { @@ -663,7 +653,7 @@ func TestFilteredSubscriptions_InvalidFilter(t *testing.T) { params: `["transaction_executed", {"state":"NOTHALT"}]`, }, } - _, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + _, _, c, respMsgs := initCleanServerAndWSClient(t) for name, this := range cases { t.Run(name, func(t *testing.T) { @@ -673,6 +663,4 @@ func TestFilteredSubscriptions_InvalidFilter(t *testing.T) { require.Contains(t, resp.Error.Error(), neorpc.ErrInvalidSubscriptionFilter.Error()) }) } - finishedFlag.CompareAndSwap(false, true) - c.Close() }