rpc: add synchronisation for httptest.Server in tests

Add waiting for startSending to ensure that the client is ready before
the server starts sending messages.

Close #3005
Close #3312

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
Ekaterina Pavlova 2024-03-13 11:41:43 +03:00
parent b12ef701f0
commit 0016b6b630

View file

@ -155,11 +155,13 @@ func TestWSClientEvents(t *testing.T) {
fmt.Sprintf(`{"jsonrpc":"2.0","method":"block_added","params":[%s]}`, b1Verbose), fmt.Sprintf(`{"jsonrpc":"2.0","method":"block_added","params":[%s]}`, b1Verbose),
`{"jsonrpc":"2.0","method":"event_missed","params":[]}`, // the last one, will trigger receiver channels closing. `{"jsonrpc":"2.0","method":"event_missed","params":[]}`, // the last one, will trigger receiver channels closing.
} }
startSending := make(chan struct{})
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/ws" && req.Method == "GET" { if req.URL.Path == "/ws" && req.Method == "GET" {
var upgrader = websocket.Upgrader{} var upgrader = websocket.Upgrader{}
ws, err := upgrader.Upgrade(w, req, nil) ws, err := upgrader.Upgrade(w, req, nil)
require.NoError(t, err) require.NoError(t, err)
<-startSending
for _, event := range events { for _, event := range events {
err = ws.SetWriteDeadline(time.Now().Add(2 * time.Second)) err = ws.SetWriteDeadline(time.Now().Add(2 * time.Second))
require.NoError(t, err) require.NoError(t, err)
@ -209,6 +211,7 @@ func TestWSClientEvents(t *testing.T) {
// MissedEvent must close the channels above. // MissedEvent must close the channels above.
wsc.subscriptionsLock.Unlock() wsc.subscriptionsLock.Unlock()
close(startSending)
var ( var (
b1Cnt, b2Cnt int b1Cnt, b2Cnt int
@ -297,11 +300,13 @@ func TestWSClientNonBlockingEvents(t *testing.T) {
require.True(t, chCap < len(events)) require.True(t, chCap < len(events))
var blocksSent atomic.Bool var blocksSent atomic.Bool
startSending := make(chan struct{})
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/ws" && req.Method == "GET" { if req.URL.Path == "/ws" && req.Method == "GET" {
var upgrader = websocket.Upgrader{} var upgrader = websocket.Upgrader{}
ws, err := upgrader.Upgrade(w, req, nil) ws, err := upgrader.Upgrade(w, req, nil)
require.NoError(t, err) require.NoError(t, err)
<-startSending
for _, event := range events { for _, event := range events {
err = ws.SetWriteDeadline(time.Now().Add(2 * time.Second)) err = ws.SetWriteDeadline(time.Now().Add(2 * time.Second))
require.NoError(t, err) require.NoError(t, err)
@ -331,6 +336,7 @@ func TestWSClientNonBlockingEvents(t *testing.T) {
wsc.receivers[chan<- *block.Block(bCh)] = []string{"0", "1"} wsc.receivers[chan<- *block.Block(bCh)] = []string{"0", "1"}
wsc.subscriptionsLock.Unlock() wsc.subscriptionsLock.Unlock()
close(startSending)
// Check that events are sent to WSClient. // Check that events are sent to WSClient.
require.Eventually(t, func() bool { require.Eventually(t, func() bool {
return blocksSent.Load() return blocksSent.Load()