From 29deba45ca5bb74d264b353cd36290d0e2df0e40 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Wed, 3 Apr 2024 12:29:04 +0300 Subject: [PATCH] services: refactor test WS reader 1. Replace isFinished atomic variable with a channel, no functional changes here, just use more common way as all our services do. 2. Do not check erors from SetReadDeadline and ReadMessage on exit. It seems to be not quite right because connection is not closed by this moment, and thus, these error checks are racy. 3. Add read timeout for the message reader. It is needed because some tests may leave message unread in the end which results in hanging test cleanup. 4. Add drain loop to message reader in order not to block WS reader on sending message. Ref. https://github.com/nspcc-dev/neo-go/pull/3392#issuecomment-2031590403. It's clear that TestBadSubUnsub is hanging on test cleanup, in particular, on attempt to wait for wsRead routine to exit. The only place where wsRead routine may hang is sending to msgCh in case if receiver is not going to read from this channel: ``` 2024-04-02T08:14:51.4957621Z goroutine 14329 [chan receive]: 2024-04-02T08:14:51.4958010Z github.com/nspcc-dev/neo-go/pkg/services/rpcsrv.initCleanServerAndWSClient.func1() 2024-04-02T08:14:51.4958344Z D:/a/neo-go/neo-go/pkg/services/rpcsrv/subscription_test.go:80 +0x71 2024-04-02T08:14:51.4958457Z testing.(*common).Cleanup.func1() 2024-04-02T08:14:51.4958757Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1175 +0x17a 2024-04-02T08:14:51.4958903Z testing.(*common).runCleanup(0xc002cf5860, 0x0) 2024-04-02T08:14:51.4959193Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1353 +0x262 2024-04-02T08:14:51.4959291Z testing.tRunner.func2() 2024-04-02T08:14:51.4959566Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1683 +0x51 2024-04-02T08:14:51.4959695Z testing.tRunner(0xc002cf5860, 0x141687410) 2024-04-02T08:14:51.4959976Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1695 +0x25e 2024-04-02T08:14:51.4960115Z created by testing.(*T).Run in goroutine 1 2024-04-02T08:14:51.4960385Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1742 +0x826 ``` Signed-off-by: Anna Shaleva --- pkg/services/rpcsrv/subscription_test.go | 58 ++++++++++++++++-------- 1 file changed, 40 insertions(+), 18 deletions(-) diff --git a/pkg/services/rpcsrv/subscription_test.go b/pkg/services/rpcsrv/subscription_test.go index 48251a3f7..b37780688 100644 --- a/pkg/services/rpcsrv/subscription_test.go +++ b/pkg/services/rpcsrv/subscription_test.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "strings" - "sync/atomic" "testing" "time" @@ -20,21 +19,36 @@ import ( const testOverflow = false -func wsReader(t *testing.T, ws *websocket.Conn, msgCh chan<- []byte, isFinished *atomic.Bool, readerToExitCh chan struct{}) { - for !isFinished.Load() { - err := ws.SetReadDeadline(time.Now().Add(5 * time.Second)) - if isFinished.Load() { - require.Error(t, err) - break +func wsReader(t *testing.T, ws *websocket.Conn, msgCh chan<- []byte, readerStopCh chan struct{}, readerToExitCh chan struct{}) { +readLoop: + for { + select { + case <-readerStopCh: + break readLoop + default: + err := ws.SetReadDeadline(time.Now().Add(5 * time.Second)) + select { + case <-readerStopCh: + break readLoop + default: + require.NoError(t, err) + } + + _, body, err := ws.ReadMessage() + select { + case <-readerStopCh: + break readLoop + default: + require.NoError(t, err) + } + + select { + case msgCh <- body: + case <-time.After(10 * time.Second): + t.Log("exiting wsReader loop: unable to send response to receiver") + break readLoop + } } - require.NoError(t, err) - _, body, err := ws.ReadMessage() - if isFinished.Load() { - require.Error(t, err) - break - } - require.NoError(t, err) - msgCh <- body } close(readerToExitCh) } @@ -69,14 +83,22 @@ func initCleanServerAndWSClient(t *testing.T, startNetworkServer ...bool) (*core // Use buffered channel to read server's messages and then read expected // responses from it. respMsgs := make(chan []byte, 16) - finishedFlag := &atomic.Bool{} + readerStopCh := make(chan struct{}) readerToExitCh := make(chan struct{}) - go wsReader(t, ws, respMsgs, finishedFlag, readerToExitCh) + go wsReader(t, ws, respMsgs, readerStopCh, readerToExitCh) if len(startNetworkServer) != 0 && startNetworkServer[0] { rpcSrv.coreServer.Start() } t.Cleanup(func() { - finishedFlag.Store(true) + drainLoop: + for { + select { + case <-respMsgs: + default: + break drainLoop + } + } + close(readerStopCh) <-readerToExitCh ws.Close() if len(startNetworkServer) != 0 && startNetworkServer[0] {