Merge pull request #3387 from nspcc-dev/fix-test-wsreader
This commit is contained in:
commit
5c995e71b5
1 changed files with 27 additions and 39 deletions
|
@ -20,8 +20,8 @@ import (
|
||||||
|
|
||||||
const testOverflow = false
|
const testOverflow = false
|
||||||
|
|
||||||
func wsReader(t *testing.T, ws *websocket.Conn, msgCh chan<- []byte, isFinished *atomic.Bool) {
|
func wsReader(t *testing.T, ws *websocket.Conn, msgCh chan<- []byte, isFinished *atomic.Bool, readerToExitCh chan struct{}) {
|
||||||
for {
|
for !isFinished.Load() {
|
||||||
err := ws.SetReadDeadline(time.Now().Add(time.Second))
|
err := ws.SetReadDeadline(time.Now().Add(time.Second))
|
||||||
if isFinished.Load() {
|
if isFinished.Load() {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
|
@ -36,6 +36,7 @@ func wsReader(t *testing.T, ws *websocket.Conn, msgCh chan<- []byte, isFinished
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
msgCh <- body
|
msgCh <- body
|
||||||
}
|
}
|
||||||
|
close(readerToExitCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
func callWSGetRaw(t *testing.T, ws *websocket.Conn, msg string, respCh <-chan []byte) *neorpc.Response {
|
func callWSGetRaw(t *testing.T, ws *websocket.Conn, msg string, respCh <-chan []byte) *neorpc.Response {
|
||||||
|
@ -56,7 +57,7 @@ func getNotification(t *testing.T, respCh <-chan []byte) *neorpc.Notification {
|
||||||
return resp
|
return resp
|
||||||
}
|
}
|
||||||
|
|
||||||
func initCleanServerAndWSClient(t *testing.T) (*core.Blockchain, *Server, *websocket.Conn, chan []byte, *atomic.Bool) {
|
func initCleanServerAndWSClient(t *testing.T, startNetworkServer ...bool) (*core.Blockchain, *Server, *websocket.Conn, chan []byte) {
|
||||||
chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t)
|
chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t)
|
||||||
|
|
||||||
dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second}
|
dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second}
|
||||||
|
@ -69,8 +70,20 @@ func initCleanServerAndWSClient(t *testing.T) (*core.Blockchain, *Server, *webso
|
||||||
// responses from it.
|
// responses from it.
|
||||||
respMsgs := make(chan []byte, 16)
|
respMsgs := make(chan []byte, 16)
|
||||||
finishedFlag := &atomic.Bool{}
|
finishedFlag := &atomic.Bool{}
|
||||||
go wsReader(t, ws, respMsgs, finishedFlag)
|
readerToExitCh := make(chan struct{})
|
||||||
return chain, rpcSrv, ws, respMsgs, finishedFlag
|
go wsReader(t, ws, respMsgs, finishedFlag, readerToExitCh)
|
||||||
|
if len(startNetworkServer) != 0 && startNetworkServer[0] {
|
||||||
|
rpcSrv.coreServer.Start()
|
||||||
|
}
|
||||||
|
t.Cleanup(func() {
|
||||||
|
finishedFlag.Store(true)
|
||||||
|
<-readerToExitCh
|
||||||
|
ws.Close()
|
||||||
|
if len(startNetworkServer) != 0 && startNetworkServer[0] {
|
||||||
|
rpcSrv.coreServer.Shutdown()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return chain, rpcSrv, ws, respMsgs
|
||||||
}
|
}
|
||||||
|
|
||||||
func callSubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, params string) string {
|
func callSubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, params string) string {
|
||||||
|
@ -95,10 +108,7 @@ func TestSubscriptions(t *testing.T) {
|
||||||
var subIDs = make([]string, 0)
|
var subIDs = make([]string, 0)
|
||||||
var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event", "header_of_added_block"}
|
var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event", "header_of_added_block"}
|
||||||
|
|
||||||
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
chain, rpcSrv, c, respMsgs := initCleanServerAndWSClient(t, true)
|
||||||
|
|
||||||
rpcSrv.coreServer.Start()
|
|
||||||
defer rpcSrv.coreServer.Shutdown()
|
|
||||||
|
|
||||||
for _, feed := range subFeeds {
|
for _, feed := range subFeeds {
|
||||||
s := callSubscribe(t, c, respMsgs, fmt.Sprintf(`["%s"]`, feed))
|
s := callSubscribe(t, c, respMsgs, fmt.Sprintf(`["%s"]`, feed))
|
||||||
|
@ -156,8 +166,6 @@ func TestSubscriptions(t *testing.T) {
|
||||||
for _, id := range subIDs {
|
for _, id := range subIDs {
|
||||||
callUnsubscribe(t, c, respMsgs, id)
|
callUnsubscribe(t, c, respMsgs, id)
|
||||||
}
|
}
|
||||||
finishedFlag.CompareAndSwap(false, true)
|
|
||||||
c.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFilteredSubscriptions(t *testing.T) {
|
func TestFilteredSubscriptions(t *testing.T) {
|
||||||
|
@ -293,7 +301,7 @@ func TestFilteredSubscriptions(t *testing.T) {
|
||||||
|
|
||||||
for name, this := range cases {
|
for name, this := range cases {
|
||||||
t.Run(name, func(t *testing.T) {
|
t.Run(name, func(t *testing.T) {
|
||||||
chain, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
chain, _, c, respMsgs := initCleanServerAndWSClient(t)
|
||||||
|
|
||||||
// It's used as an end-of-event-stream, so it's always present.
|
// It's used as an end-of-event-stream, so it's always present.
|
||||||
blockSubID := callSubscribe(t, c, respMsgs, `["block_added"]`)
|
blockSubID := callSubscribe(t, c, respMsgs, `["block_added"]`)
|
||||||
|
@ -320,8 +328,6 @@ func TestFilteredSubscriptions(t *testing.T) {
|
||||||
|
|
||||||
callUnsubscribe(t, c, respMsgs, subID)
|
callUnsubscribe(t, c, respMsgs, subID)
|
||||||
callUnsubscribe(t, c, respMsgs, blockSubID)
|
callUnsubscribe(t, c, respMsgs, blockSubID)
|
||||||
finishedFlag.CompareAndSwap(false, true)
|
|
||||||
c.Close()
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -389,8 +395,7 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
chain, rpcSrv, c, respMsgs := initCleanServerAndWSClient(t, true)
|
||||||
rpcSrv.coreServer.Start()
|
|
||||||
|
|
||||||
// blocks are needed to make GAS deposit for priv0
|
// blocks are needed to make GAS deposit for priv0
|
||||||
blocks := getTestBlocks(t)
|
blocks := getTestBlocks(t)
|
||||||
|
@ -421,15 +426,13 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) {
|
||||||
callUnsubscribe(t, c, respMsgs, subID)
|
callUnsubscribe(t, c, respMsgs, subID)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
finishedFlag.CompareAndSwap(false, true)
|
|
||||||
c.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFilteredBlockSubscriptions(t *testing.T) {
|
func TestFilteredBlockSubscriptions(t *testing.T) {
|
||||||
// We can't fit this into TestFilteredSubscriptions, because it uses
|
// We can't fit this into TestFilteredSubscriptions, because it uses
|
||||||
// blocks as EOF events to wait for.
|
// blocks as EOF events to wait for.
|
||||||
const numBlocks = 10
|
const numBlocks = 10
|
||||||
chain, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
chain, _, c, respMsgs := initCleanServerAndWSClient(t)
|
||||||
|
|
||||||
blockSubID := callSubscribe(t, c, respMsgs, `["block_added", {"primary":3}]`)
|
blockSubID := callSubscribe(t, c, respMsgs, `["block_added", {"primary":3}]`)
|
||||||
|
|
||||||
|
@ -458,13 +461,11 @@ func TestFilteredBlockSubscriptions(t *testing.T) {
|
||||||
require.Equal(t, 3, int(primary))
|
require.Equal(t, 3, int(primary))
|
||||||
}
|
}
|
||||||
callUnsubscribe(t, c, respMsgs, blockSubID)
|
callUnsubscribe(t, c, respMsgs, blockSubID)
|
||||||
finishedFlag.CompareAndSwap(false, true)
|
|
||||||
c.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHeaderOfAddedBlockSubscriptions(t *testing.T) {
|
func TestHeaderOfAddedBlockSubscriptions(t *testing.T) {
|
||||||
const numBlocks = 10
|
const numBlocks = 10
|
||||||
chain, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
chain, _, c, respMsgs := initCleanServerAndWSClient(t)
|
||||||
|
|
||||||
headerSubID := callSubscribe(t, c, respMsgs, `["header_of_added_block", {"primary":3}]`)
|
headerSubID := callSubscribe(t, c, respMsgs, `["header_of_added_block", {"primary":3}]`)
|
||||||
|
|
||||||
|
@ -493,13 +494,11 @@ func TestHeaderOfAddedBlockSubscriptions(t *testing.T) {
|
||||||
require.Equal(t, 3, int(primary))
|
require.Equal(t, 3, int(primary))
|
||||||
}
|
}
|
||||||
callUnsubscribe(t, c, respMsgs, headerSubID)
|
callUnsubscribe(t, c, respMsgs, headerSubID)
|
||||||
finishedFlag.CompareAndSwap(false, true)
|
|
||||||
c.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMaxSubscriptions(t *testing.T) {
|
func TestMaxSubscriptions(t *testing.T) {
|
||||||
var subIDs = make([]string, 0)
|
var subIDs = make([]string, 0)
|
||||||
_, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
_, _, c, respMsgs := initCleanServerAndWSClient(t)
|
||||||
|
|
||||||
for i := 0; i < maxFeeds+1; i++ {
|
for i := 0; i < maxFeeds+1; i++ {
|
||||||
var s string
|
var s string
|
||||||
|
@ -518,9 +517,6 @@ func TestMaxSubscriptions(t *testing.T) {
|
||||||
require.Nil(t, resp.Result)
|
require.Nil(t, resp.Result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
finishedFlag.CompareAndSwap(false, true)
|
|
||||||
c.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBadSubUnsub(t *testing.T) {
|
func TestBadSubUnsub(t *testing.T) {
|
||||||
|
@ -542,7 +538,7 @@ func TestBadSubUnsub(t *testing.T) {
|
||||||
"bad id": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": ["vasiliy"], "id": 1}`,
|
"bad id": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": ["vasiliy"], "id": 1}`,
|
||||||
"not subscribed id": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": ["7"], "id": 1}`,
|
"not subscribed id": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": ["7"], "id": 1}`,
|
||||||
}
|
}
|
||||||
_, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
_, _, c, respMsgs := initCleanServerAndWSClient(t)
|
||||||
|
|
||||||
testF := func(t *testing.T, cases map[string]string) func(t *testing.T) {
|
testF := func(t *testing.T, cases map[string]string) func(t *testing.T) {
|
||||||
return func(t *testing.T) {
|
return func(t *testing.T) {
|
||||||
|
@ -557,9 +553,6 @@ func TestBadSubUnsub(t *testing.T) {
|
||||||
}
|
}
|
||||||
t.Run("subscribe", testF(t, subCases))
|
t.Run("subscribe", testF(t, subCases))
|
||||||
t.Run("unsubscribe", testF(t, unsubCases))
|
t.Run("unsubscribe", testF(t, unsubCases))
|
||||||
|
|
||||||
finishedFlag.CompareAndSwap(false, true)
|
|
||||||
c.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func doSomeWSRequest(t *testing.T, ws *websocket.Conn) {
|
func doSomeWSRequest(t *testing.T, ws *websocket.Conn) {
|
||||||
|
@ -625,7 +618,7 @@ func TestSubscriptionOverflow(t *testing.T) {
|
||||||
const blockCnt = notificationBufSize * 5
|
const blockCnt = notificationBufSize * 5
|
||||||
var receivedMiss bool
|
var receivedMiss bool
|
||||||
|
|
||||||
chain, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
chain, _, c, respMsgs := initCleanServerAndWSClient(t)
|
||||||
|
|
||||||
resp := callWSGetRaw(t, c, `{"jsonrpc": "2.0","method": "subscribe","params": ["block_added"],"id": 1}`, respMsgs)
|
resp := callWSGetRaw(t, c, `{"jsonrpc": "2.0","method": "subscribe","params": ["block_added"],"id": 1}`, respMsgs)
|
||||||
require.Nil(t, resp.Error)
|
require.Nil(t, resp.Error)
|
||||||
|
@ -647,9 +640,6 @@ func TestSubscriptionOverflow(t *testing.T) {
|
||||||
require.Equal(t, true, receivedMiss)
|
require.Equal(t, true, receivedMiss)
|
||||||
// `Missed` is the last event and there is nothing afterwards.
|
// `Missed` is the last event and there is nothing afterwards.
|
||||||
require.Equal(t, 0, len(respMsgs))
|
require.Equal(t, 0, len(respMsgs))
|
||||||
|
|
||||||
finishedFlag.CompareAndSwap(false, true)
|
|
||||||
c.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFilteredSubscriptions_InvalidFilter(t *testing.T) {
|
func TestFilteredSubscriptions_InvalidFilter(t *testing.T) {
|
||||||
|
@ -663,7 +653,7 @@ func TestFilteredSubscriptions_InvalidFilter(t *testing.T) {
|
||||||
params: `["transaction_executed", {"state":"NOTHALT"}]`,
|
params: `["transaction_executed", {"state":"NOTHALT"}]`,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
_, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
|
_, _, c, respMsgs := initCleanServerAndWSClient(t)
|
||||||
|
|
||||||
for name, this := range cases {
|
for name, this := range cases {
|
||||||
t.Run(name, func(t *testing.T) {
|
t.Run(name, func(t *testing.T) {
|
||||||
|
@ -673,6 +663,4 @@ func TestFilteredSubscriptions_InvalidFilter(t *testing.T) {
|
||||||
require.Contains(t, resp.Error.Error(), neorpc.ErrInvalidSubscriptionFilter.Error())
|
require.Contains(t, resp.Error.Error(), neorpc.ErrInvalidSubscriptionFilter.Error())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
finishedFlag.CompareAndSwap(false, true)
|
|
||||||
c.Close()
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue