From 57d82c128133e62dd8039afac212bfbc3be62f2b Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Mon, 1 Apr 2024 17:50:25 +0300 Subject: [PATCH 1/2] rpcsrv: properly cleanup WS reader in tests Close #3378. Signed-off-by: Anna Shaleva --- pkg/services/rpcsrv/subscription_test.go | 57 +++++++++--------------- 1 file changed, 21 insertions(+), 36 deletions(-) diff --git a/pkg/services/rpcsrv/subscription_test.go b/pkg/services/rpcsrv/subscription_test.go index eed19a3ad..0d2c5071d 100644 --- a/pkg/services/rpcsrv/subscription_test.go +++ b/pkg/services/rpcsrv/subscription_test.go @@ -20,8 +20,8 @@ import ( const testOverflow = false -func wsReader(t *testing.T, ws *websocket.Conn, msgCh chan<- []byte, isFinished *atomic.Bool) { - for { +func wsReader(t *testing.T, ws *websocket.Conn, msgCh chan<- []byte, isFinished *atomic.Bool, readerToExitCh chan struct{}) { + for !isFinished.Load() { err := ws.SetReadDeadline(time.Now().Add(time.Second)) if isFinished.Load() { require.Error(t, err) @@ -36,6 +36,7 @@ func wsReader(t *testing.T, ws *websocket.Conn, msgCh chan<- []byte, isFinished require.NoError(t, err) msgCh <- body } + close(readerToExitCh) } func callWSGetRaw(t *testing.T, ws *websocket.Conn, msg string, respCh <-chan []byte) *neorpc.Response { @@ -56,7 +57,7 @@ func getNotification(t *testing.T, respCh <-chan []byte) *neorpc.Notification { return resp } -func initCleanServerAndWSClient(t *testing.T) (*core.Blockchain, *Server, *websocket.Conn, chan []byte, *atomic.Bool) { +func initCleanServerAndWSClient(t *testing.T) (*core.Blockchain, *Server, *websocket.Conn, chan []byte) { chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t) dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second} @@ -69,8 +70,14 @@ func initCleanServerAndWSClient(t *testing.T) (*core.Blockchain, *Server, *webso // responses from it. respMsgs := make(chan []byte, 16) finishedFlag := &atomic.Bool{} - go wsReader(t, ws, respMsgs, finishedFlag) - return chain, rpcSrv, ws, respMsgs, finishedFlag + readerToExitCh := make(chan struct{}) + go wsReader(t, ws, respMsgs, finishedFlag, readerToExitCh) + t.Cleanup(func() { + finishedFlag.Store(true) + <-readerToExitCh + ws.Close() + }) + return chain, rpcSrv, ws, respMsgs } func callSubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, params string) string { @@ -95,8 +102,7 @@ func TestSubscriptions(t *testing.T) { var subIDs = make([]string, 0) var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event", "header_of_added_block"} - chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) - + chain, rpcSrv, c, respMsgs := initCleanServerAndWSClient(t) rpcSrv.coreServer.Start() defer rpcSrv.coreServer.Shutdown() @@ -156,8 +162,6 @@ func TestSubscriptions(t *testing.T) { for _, id := range subIDs { callUnsubscribe(t, c, respMsgs, id) } - finishedFlag.CompareAndSwap(false, true) - c.Close() } func TestFilteredSubscriptions(t *testing.T) { @@ -293,7 +297,7 @@ func TestFilteredSubscriptions(t *testing.T) { for name, this := range cases { t.Run(name, func(t *testing.T) { - chain, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + chain, _, c, respMsgs := initCleanServerAndWSClient(t) // It's used as an end-of-event-stream, so it's always present. blockSubID := callSubscribe(t, c, respMsgs, `["block_added"]`) @@ -320,8 +324,6 @@ func TestFilteredSubscriptions(t *testing.T) { callUnsubscribe(t, c, respMsgs, subID) callUnsubscribe(t, c, respMsgs, blockSubID) - finishedFlag.CompareAndSwap(false, true) - c.Close() }) } } @@ -389,7 +391,7 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) { }, } - chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + chain, rpcSrv, c, respMsgs := initCleanServerAndWSClient(t) rpcSrv.coreServer.Start() // blocks are needed to make GAS deposit for priv0 @@ -421,15 +423,13 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) { callUnsubscribe(t, c, respMsgs, subID) }) } - finishedFlag.CompareAndSwap(false, true) - c.Close() } func TestFilteredBlockSubscriptions(t *testing.T) { // We can't fit this into TestFilteredSubscriptions, because it uses // blocks as EOF events to wait for. const numBlocks = 10 - chain, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + chain, _, c, respMsgs := initCleanServerAndWSClient(t) blockSubID := callSubscribe(t, c, respMsgs, `["block_added", {"primary":3}]`) @@ -458,13 +458,11 @@ func TestFilteredBlockSubscriptions(t *testing.T) { require.Equal(t, 3, int(primary)) } callUnsubscribe(t, c, respMsgs, blockSubID) - finishedFlag.CompareAndSwap(false, true) - c.Close() } func TestHeaderOfAddedBlockSubscriptions(t *testing.T) { const numBlocks = 10 - chain, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + chain, _, c, respMsgs := initCleanServerAndWSClient(t) headerSubID := callSubscribe(t, c, respMsgs, `["header_of_added_block", {"primary":3}]`) @@ -493,13 +491,11 @@ func TestHeaderOfAddedBlockSubscriptions(t *testing.T) { require.Equal(t, 3, int(primary)) } callUnsubscribe(t, c, respMsgs, headerSubID) - finishedFlag.CompareAndSwap(false, true) - c.Close() } func TestMaxSubscriptions(t *testing.T) { var subIDs = make([]string, 0) - _, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + _, _, c, respMsgs := initCleanServerAndWSClient(t) for i := 0; i < maxFeeds+1; i++ { var s string @@ -518,9 +514,6 @@ func TestMaxSubscriptions(t *testing.T) { require.Nil(t, resp.Result) } } - - finishedFlag.CompareAndSwap(false, true) - c.Close() } func TestBadSubUnsub(t *testing.T) { @@ -542,7 +535,7 @@ func TestBadSubUnsub(t *testing.T) { "bad id": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": ["vasiliy"], "id": 1}`, "not subscribed id": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": ["7"], "id": 1}`, } - _, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + _, _, c, respMsgs := initCleanServerAndWSClient(t) testF := func(t *testing.T, cases map[string]string) func(t *testing.T) { return func(t *testing.T) { @@ -557,9 +550,6 @@ func TestBadSubUnsub(t *testing.T) { } t.Run("subscribe", testF(t, subCases)) t.Run("unsubscribe", testF(t, unsubCases)) - - finishedFlag.CompareAndSwap(false, true) - c.Close() } func doSomeWSRequest(t *testing.T, ws *websocket.Conn) { @@ -625,7 +615,7 @@ func TestSubscriptionOverflow(t *testing.T) { const blockCnt = notificationBufSize * 5 var receivedMiss bool - chain, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + chain, _, c, respMsgs := initCleanServerAndWSClient(t) resp := callWSGetRaw(t, c, `{"jsonrpc": "2.0","method": "subscribe","params": ["block_added"],"id": 1}`, respMsgs) require.Nil(t, resp.Error) @@ -647,9 +637,6 @@ func TestSubscriptionOverflow(t *testing.T) { require.Equal(t, true, receivedMiss) // `Missed` is the last event and there is nothing afterwards. require.Equal(t, 0, len(respMsgs)) - - finishedFlag.CompareAndSwap(false, true) - c.Close() } func TestFilteredSubscriptions_InvalidFilter(t *testing.T) { @@ -663,7 +650,7 @@ func TestFilteredSubscriptions_InvalidFilter(t *testing.T) { params: `["transaction_executed", {"state":"NOTHALT"}]`, }, } - _, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + _, _, c, respMsgs := initCleanServerAndWSClient(t) for name, this := range cases { t.Run(name, func(t *testing.T) { @@ -673,6 +660,4 @@ func TestFilteredSubscriptions_InvalidFilter(t *testing.T) { require.Contains(t, resp.Error.Error(), neorpc.ErrInvalidSubscriptionFilter.Error()) }) } - finishedFlag.CompareAndSwap(false, true) - c.Close() } From 5b30d15f8e1e7de48c158b317f2dd9964a4bb07d Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Mon, 1 Apr 2024 18:06:08 +0300 Subject: [PATCH 2/2] rpcsrv: properly cleanup network server in tests Port the idea of https://github.com/nspcc-dev/neo-go/pull/3380. Signed-off-by: Anna Shaleva --- pkg/services/rpcsrv/subscription_test.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pkg/services/rpcsrv/subscription_test.go b/pkg/services/rpcsrv/subscription_test.go index 0d2c5071d..fbc35bac1 100644 --- a/pkg/services/rpcsrv/subscription_test.go +++ b/pkg/services/rpcsrv/subscription_test.go @@ -57,7 +57,7 @@ func getNotification(t *testing.T, respCh <-chan []byte) *neorpc.Notification { return resp } -func initCleanServerAndWSClient(t *testing.T) (*core.Blockchain, *Server, *websocket.Conn, chan []byte) { +func initCleanServerAndWSClient(t *testing.T, startNetworkServer ...bool) (*core.Blockchain, *Server, *websocket.Conn, chan []byte) { chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t) dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second} @@ -72,10 +72,16 @@ func initCleanServerAndWSClient(t *testing.T) (*core.Blockchain, *Server, *webso finishedFlag := &atomic.Bool{} readerToExitCh := make(chan struct{}) go wsReader(t, ws, respMsgs, finishedFlag, readerToExitCh) + if len(startNetworkServer) != 0 && startNetworkServer[0] { + rpcSrv.coreServer.Start() + } t.Cleanup(func() { finishedFlag.Store(true) <-readerToExitCh ws.Close() + if len(startNetworkServer) != 0 && startNetworkServer[0] { + rpcSrv.coreServer.Shutdown() + } }) return chain, rpcSrv, ws, respMsgs } @@ -102,9 +108,7 @@ func TestSubscriptions(t *testing.T) { var subIDs = make([]string, 0) var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event", "header_of_added_block"} - chain, rpcSrv, c, respMsgs := initCleanServerAndWSClient(t) - rpcSrv.coreServer.Start() - defer rpcSrv.coreServer.Shutdown() + chain, rpcSrv, c, respMsgs := initCleanServerAndWSClient(t, true) for _, feed := range subFeeds { s := callSubscribe(t, c, respMsgs, fmt.Sprintf(`["%s"]`, feed)) @@ -391,8 +395,7 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) { }, } - chain, rpcSrv, c, respMsgs := initCleanServerAndWSClient(t) - rpcSrv.coreServer.Start() + chain, rpcSrv, c, respMsgs := initCleanServerAndWSClient(t, true) // blocks are needed to make GAS deposit for priv0 blocks := getTestBlocks(t)