29deba45ca
1. Replace isFinished atomic variable with a channel, no functional changes here, just use more common way as all our services do. 2. Do not check erors from SetReadDeadline and ReadMessage on exit. It seems to be not quite right because connection is not closed by this moment, and thus, these error checks are racy. 3. Add read timeout for the message reader. It is needed because some tests may leave message unread in the end which results in hanging test cleanup. 4. Add drain loop to message reader in order not to block WS reader on sending message. Ref. https://github.com/nspcc-dev/neo-go/pull/3392#issuecomment-2031590403. It's clear that TestBadSubUnsub is hanging on test cleanup, in particular, on attempt to wait for wsRead routine to exit. The only place where wsRead routine may hang is sending to msgCh in case if receiver is not going to read from this channel: ``` 2024-04-02T08:14:51.4957621Z goroutine 14329 [chan receive]: 2024-04-02T08:14:51.4958010Z github.com/nspcc-dev/neo-go/pkg/services/rpcsrv.initCleanServerAndWSClient.func1() 2024-04-02T08:14:51.4958344Z D:/a/neo-go/neo-go/pkg/services/rpcsrv/subscription_test.go:80 +0x71 2024-04-02T08:14:51.4958457Z testing.(*common).Cleanup.func1() 2024-04-02T08:14:51.4958757Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1175 +0x17a 2024-04-02T08:14:51.4958903Z testing.(*common).runCleanup(0xc002cf5860, 0x0) 2024-04-02T08:14:51.4959193Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1353 +0x262 2024-04-02T08:14:51.4959291Z testing.tRunner.func2() 2024-04-02T08:14:51.4959566Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1683 +0x51 2024-04-02T08:14:51.4959695Z testing.tRunner(0xc002cf5860, 0x141687410) 2024-04-02T08:14:51.4959976Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1695 +0x25e 2024-04-02T08:14:51.4960115Z created by testing.(*T).Run in goroutine 1 2024-04-02T08:14:51.4960385Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1742 +0x826 ``` Signed-off-by: Anna Shaleva <shaleva.ann@nspcc.ru>
688 lines
23 KiB
Go
688 lines
23 KiB
Go
package rpcsrv
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"github.com/nspcc-dev/neo-go/internal/testchain"
|
|
"github.com/nspcc-dev/neo-go/pkg/config"
|
|
"github.com/nspcc-dev/neo-go/pkg/core"
|
|
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
|
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
const testOverflow = false
|
|
|
|
func wsReader(t *testing.T, ws *websocket.Conn, msgCh chan<- []byte, readerStopCh chan struct{}, readerToExitCh chan struct{}) {
|
|
readLoop:
|
|
for {
|
|
select {
|
|
case <-readerStopCh:
|
|
break readLoop
|
|
default:
|
|
err := ws.SetReadDeadline(time.Now().Add(5 * time.Second))
|
|
select {
|
|
case <-readerStopCh:
|
|
break readLoop
|
|
default:
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
_, body, err := ws.ReadMessage()
|
|
select {
|
|
case <-readerStopCh:
|
|
break readLoop
|
|
default:
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
select {
|
|
case msgCh <- body:
|
|
case <-time.After(10 * time.Second):
|
|
t.Log("exiting wsReader loop: unable to send response to receiver")
|
|
break readLoop
|
|
}
|
|
}
|
|
}
|
|
close(readerToExitCh)
|
|
}
|
|
|
|
func callWSGetRaw(t *testing.T, ws *websocket.Conn, msg string, respCh <-chan []byte) *neorpc.Response {
|
|
var resp = new(neorpc.Response)
|
|
|
|
require.NoError(t, ws.SetWriteDeadline(time.Now().Add(5*time.Second)))
|
|
require.NoError(t, ws.WriteMessage(websocket.TextMessage, []byte(msg)))
|
|
|
|
body := <-respCh
|
|
require.NoError(t, json.Unmarshal(body, resp))
|
|
return resp
|
|
}
|
|
|
|
func getNotification(t *testing.T, respCh <-chan []byte) *neorpc.Notification {
|
|
var resp = new(neorpc.Notification)
|
|
body := <-respCh
|
|
require.NoError(t, json.Unmarshal(body, resp))
|
|
return resp
|
|
}
|
|
|
|
func initCleanServerAndWSClient(t *testing.T, startNetworkServer ...bool) (*core.Blockchain, *Server, *websocket.Conn, chan []byte) {
|
|
chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t)
|
|
|
|
dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second}
|
|
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
|
|
ws, r, err := dialer.Dial(url, nil)
|
|
require.NoError(t, err)
|
|
defer r.Body.Close()
|
|
|
|
// Use buffered channel to read server's messages and then read expected
|
|
// responses from it.
|
|
respMsgs := make(chan []byte, 16)
|
|
readerStopCh := make(chan struct{})
|
|
readerToExitCh := make(chan struct{})
|
|
go wsReader(t, ws, respMsgs, readerStopCh, readerToExitCh)
|
|
if len(startNetworkServer) != 0 && startNetworkServer[0] {
|
|
rpcSrv.coreServer.Start()
|
|
}
|
|
t.Cleanup(func() {
|
|
drainLoop:
|
|
for {
|
|
select {
|
|
case <-respMsgs:
|
|
default:
|
|
break drainLoop
|
|
}
|
|
}
|
|
close(readerStopCh)
|
|
<-readerToExitCh
|
|
ws.Close()
|
|
if len(startNetworkServer) != 0 && startNetworkServer[0] {
|
|
rpcSrv.coreServer.Shutdown()
|
|
}
|
|
})
|
|
return chain, rpcSrv, ws, respMsgs
|
|
}
|
|
|
|
func callSubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, params string) string {
|
|
var s string
|
|
resp := callWSGetRaw(t, ws, fmt.Sprintf(`{"jsonrpc": "2.0","method": "subscribe","params": %s,"id": 1}`, params), msgs)
|
|
require.Nil(t, resp.Error)
|
|
require.NotNil(t, resp.Result)
|
|
require.NoError(t, json.Unmarshal(resp.Result, &s))
|
|
return s
|
|
}
|
|
|
|
func callUnsubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, id string) {
|
|
var b bool
|
|
resp := callWSGetRaw(t, ws, fmt.Sprintf(`{"jsonrpc": "2.0","method": "unsubscribe","params": ["%s"],"id": 1}`, id), msgs)
|
|
require.Nil(t, resp.Error)
|
|
require.NotNil(t, resp.Result)
|
|
require.NoError(t, json.Unmarshal(resp.Result, &b))
|
|
require.Equal(t, true, b)
|
|
}
|
|
|
|
func TestSubscriptions(t *testing.T) {
|
|
var subIDs = make([]string, 0)
|
|
var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event", "header_of_added_block"}
|
|
|
|
chain, rpcSrv, c, respMsgs := initCleanServerAndWSClient(t, true)
|
|
|
|
for _, feed := range subFeeds {
|
|
s := callSubscribe(t, c, respMsgs, fmt.Sprintf(`["%s"]`, feed))
|
|
subIDs = append(subIDs, s)
|
|
}
|
|
|
|
for _, b := range getTestBlocks(t) {
|
|
require.NoError(t, chain.AddBlock(b))
|
|
resp := getNotification(t, respMsgs)
|
|
require.Equal(t, neorpc.ExecutionEventID, resp.Event)
|
|
for {
|
|
resp = getNotification(t, respMsgs)
|
|
if resp.Event != neorpc.NotificationEventID {
|
|
break
|
|
}
|
|
}
|
|
for i := 0; i < len(b.Transactions); i++ {
|
|
if i > 0 {
|
|
resp = getNotification(t, respMsgs)
|
|
}
|
|
require.Equal(t, neorpc.ExecutionEventID, resp.Event)
|
|
for {
|
|
resp := getNotification(t, respMsgs)
|
|
if resp.Event == neorpc.NotificationEventID {
|
|
continue
|
|
}
|
|
require.Equal(t, neorpc.TransactionEventID, resp.Event)
|
|
break
|
|
}
|
|
}
|
|
resp = getNotification(t, respMsgs)
|
|
require.Equal(t, neorpc.ExecutionEventID, resp.Event)
|
|
for {
|
|
resp = getNotification(t, respMsgs)
|
|
if resp.Event != neorpc.NotificationEventID {
|
|
break
|
|
}
|
|
}
|
|
require.Equal(t, neorpc.HeaderOfAddedBlockEventID, resp.Event)
|
|
resp = getNotification(t, respMsgs)
|
|
require.Equal(t, neorpc.BlockEventID, resp.Event)
|
|
}
|
|
|
|
// We should manually add NotaryRequest to test notification.
|
|
sender := testchain.PrivateKeyByID(0)
|
|
err := rpcSrv.coreServer.RelayP2PNotaryRequest(createValidNotaryRequest(chain, sender, 1, 2_0000_0000, nil))
|
|
require.NoError(t, err)
|
|
for {
|
|
resp := getNotification(t, respMsgs)
|
|
if resp.Event == neorpc.NotaryRequestEventID {
|
|
break
|
|
}
|
|
}
|
|
|
|
for _, id := range subIDs {
|
|
callUnsubscribe(t, c, respMsgs, id)
|
|
}
|
|
}
|
|
|
|
func TestFilteredSubscriptions(t *testing.T) {
|
|
priv0 := testchain.PrivateKeyByID(0)
|
|
var goodSender = priv0.GetScriptHash()
|
|
|
|
var cases = map[string]struct {
|
|
params string
|
|
check func(*testing.T, *neorpc.Notification)
|
|
}{
|
|
"tx matching sender": {
|
|
params: `["transaction_added", {"sender":"` + goodSender.StringLE() + `"}]`,
|
|
check: func(t *testing.T, resp *neorpc.Notification) {
|
|
rmap := resp.Payload[0].(map[string]any)
|
|
require.Equal(t, neorpc.TransactionEventID, resp.Event)
|
|
sender := rmap["sender"].(string)
|
|
require.Equal(t, address.Uint160ToString(goodSender), sender)
|
|
},
|
|
},
|
|
"tx matching signer": {
|
|
params: `["transaction_added", {"signer":"` + goodSender.StringLE() + `"}]`,
|
|
check: func(t *testing.T, resp *neorpc.Notification) {
|
|
rmap := resp.Payload[0].(map[string]any)
|
|
require.Equal(t, neorpc.TransactionEventID, resp.Event)
|
|
signers := rmap["signers"].([]any)
|
|
signer0 := signers[0].(map[string]any)
|
|
signer0acc := signer0["account"].(string)
|
|
require.Equal(t, "0x"+goodSender.StringLE(), signer0acc)
|
|
},
|
|
},
|
|
"tx matching sender and signer": {
|
|
params: `["transaction_added", {"sender":"` + goodSender.StringLE() + `", "signer":"` + goodSender.StringLE() + `"}]`,
|
|
check: func(t *testing.T, resp *neorpc.Notification) {
|
|
rmap := resp.Payload[0].(map[string]any)
|
|
require.Equal(t, neorpc.TransactionEventID, resp.Event)
|
|
sender := rmap["sender"].(string)
|
|
require.Equal(t, address.Uint160ToString(goodSender), sender)
|
|
signers := rmap["signers"].([]any)
|
|
signer0 := signers[0].(map[string]any)
|
|
signer0acc := signer0["account"].(string)
|
|
require.Equal(t, "0x"+goodSender.StringLE(), signer0acc)
|
|
},
|
|
},
|
|
"notification matching contract hash": {
|
|
params: `["notification_from_execution", {"contract":"` + testContractHash + `"}]`,
|
|
check: func(t *testing.T, resp *neorpc.Notification) {
|
|
rmap := resp.Payload[0].(map[string]any)
|
|
require.Equal(t, neorpc.NotificationEventID, resp.Event)
|
|
c := rmap["contract"].(string)
|
|
require.Equal(t, "0x"+testContractHash, c)
|
|
},
|
|
},
|
|
"notification matching name": {
|
|
params: `["notification_from_execution", {"name":"my_pretty_notification"}]`,
|
|
check: func(t *testing.T, resp *neorpc.Notification) {
|
|
rmap := resp.Payload[0].(map[string]any)
|
|
require.Equal(t, neorpc.NotificationEventID, resp.Event)
|
|
n := rmap["name"].(string)
|
|
require.Equal(t, "my_pretty_notification", n)
|
|
},
|
|
},
|
|
"notification matching contract hash and name": {
|
|
params: `["notification_from_execution", {"contract":"` + testContractHash + `", "name":"my_pretty_notification"}]`,
|
|
check: func(t *testing.T, resp *neorpc.Notification) {
|
|
rmap := resp.Payload[0].(map[string]any)
|
|
require.Equal(t, neorpc.NotificationEventID, resp.Event)
|
|
c := rmap["contract"].(string)
|
|
require.Equal(t, "0x"+testContractHash, c)
|
|
n := rmap["name"].(string)
|
|
require.Equal(t, "my_pretty_notification", n)
|
|
},
|
|
},
|
|
"execution matching state": {
|
|
params: `["transaction_executed", {"state":"HALT"}]`,
|
|
check: func(t *testing.T, resp *neorpc.Notification) {
|
|
rmap := resp.Payload[0].(map[string]any)
|
|
require.Equal(t, neorpc.ExecutionEventID, resp.Event)
|
|
st := rmap["vmstate"].(string)
|
|
require.Equal(t, "HALT", st)
|
|
},
|
|
},
|
|
"execution matching container": {
|
|
params: `["transaction_executed", {"container":"` + deploymentTxHash + `"}]`,
|
|
check: func(t *testing.T, resp *neorpc.Notification) {
|
|
rmap := resp.Payload[0].(map[string]any)
|
|
require.Equal(t, neorpc.ExecutionEventID, resp.Event)
|
|
tx := rmap["container"].(string)
|
|
require.Equal(t, "0x"+deploymentTxHash, tx)
|
|
},
|
|
},
|
|
"execution matching state and container": {
|
|
params: `["transaction_executed", {"state":"HALT", "container":"` + deploymentTxHash + `"}]`,
|
|
check: func(t *testing.T, resp *neorpc.Notification) {
|
|
rmap := resp.Payload[0].(map[string]any)
|
|
require.Equal(t, neorpc.ExecutionEventID, resp.Event)
|
|
tx := rmap["container"].(string)
|
|
require.Equal(t, "0x"+deploymentTxHash, tx)
|
|
st := rmap["vmstate"].(string)
|
|
require.Equal(t, "HALT", st)
|
|
},
|
|
},
|
|
"tx non-matching": {
|
|
params: `["transaction_added", {"sender":"00112233445566778899aabbccddeeff00112233"}]`,
|
|
check: func(t *testing.T, _ *neorpc.Notification) {
|
|
t.Fatal("unexpected match for EnrollmentTransaction")
|
|
},
|
|
},
|
|
"notification non-matching": {
|
|
params: `["notification_from_execution", {"contract":"00112233445566778899aabbccddeeff00112233"}]`,
|
|
check: func(t *testing.T, _ *neorpc.Notification) {
|
|
t.Fatal("unexpected match for contract 00112233445566778899aabbccddeeff00112233")
|
|
},
|
|
},
|
|
"execution non-matching": {
|
|
// We have single FAULTed transaction in chain, this, use the wrong hash for this test instead of FAULT state.
|
|
params: `["transaction_executed", {"container":"0x` + util.Uint256{}.StringLE() + `"}]`,
|
|
check: func(t *testing.T, n *neorpc.Notification) {
|
|
t.Fatal("unexpected match for faulted execution")
|
|
},
|
|
},
|
|
"header of added block": {
|
|
params: `["header_of_added_block", {"primary": 0, "since": 5}]`,
|
|
check: func(t *testing.T, resp *neorpc.Notification) {
|
|
rmap := resp.Payload[0].(map[string]any)
|
|
require.Equal(t, neorpc.HeaderOfAddedBlockEventID, resp.Event)
|
|
primary := rmap["primary"].(float64)
|
|
require.Equal(t, 0, int(primary))
|
|
index := rmap["index"].(float64)
|
|
require.Less(t, 4, int(index))
|
|
},
|
|
},
|
|
}
|
|
|
|
for name, this := range cases {
|
|
t.Run(name, func(t *testing.T) {
|
|
chain, _, c, respMsgs := initCleanServerAndWSClient(t)
|
|
|
|
// It's used as an end-of-event-stream, so it's always present.
|
|
blockSubID := callSubscribe(t, c, respMsgs, `["block_added"]`)
|
|
subID := callSubscribe(t, c, respMsgs, this.params)
|
|
|
|
var lastBlock uint32
|
|
for _, b := range getTestBlocks(t) {
|
|
require.NoError(t, chain.AddBlock(b))
|
|
lastBlock = b.Index
|
|
}
|
|
|
|
for {
|
|
resp := getNotification(t, respMsgs)
|
|
rmap := resp.Payload[0].(map[string]any)
|
|
if resp.Event == neorpc.BlockEventID {
|
|
index := rmap["index"].(float64)
|
|
if uint32(index) == lastBlock {
|
|
break
|
|
}
|
|
continue
|
|
}
|
|
this.check(t, resp)
|
|
}
|
|
|
|
callUnsubscribe(t, c, respMsgs, subID)
|
|
callUnsubscribe(t, c, respMsgs, blockSubID)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestFilteredNotaryRequestSubscriptions(t *testing.T) {
|
|
// We can't fit this into TestFilteredSubscriptions, because notary requests
|
|
// event doesn't depend on blocks events.
|
|
priv0 := testchain.PrivateKeyByID(0)
|
|
var goodSender = priv0.GetScriptHash()
|
|
|
|
var cases = map[string]struct {
|
|
params string
|
|
check func(*testing.T, *neorpc.Notification)
|
|
}{
|
|
"matching sender": {
|
|
params: `["notary_request_event", {"sender":"` + goodSender.StringLE() + `"}]`,
|
|
check: func(t *testing.T, resp *neorpc.Notification) {
|
|
rmap := resp.Payload[0].(map[string]any)
|
|
require.Equal(t, neorpc.NotaryRequestEventID, resp.Event)
|
|
require.Equal(t, "added", rmap["type"].(string))
|
|
req := rmap["notaryrequest"].(map[string]any)
|
|
fbTx := req["fallbacktx"].(map[string]any)
|
|
sender := fbTx["signers"].([]any)[1].(map[string]any)["account"].(string)
|
|
require.Equal(t, "0x"+goodSender.StringLE(), sender)
|
|
},
|
|
},
|
|
"matching signer": {
|
|
params: `["notary_request_event", {"signer":"` + goodSender.StringLE() + `"}]`,
|
|
check: func(t *testing.T, resp *neorpc.Notification) {
|
|
rmap := resp.Payload[0].(map[string]any)
|
|
require.Equal(t, neorpc.NotaryRequestEventID, resp.Event)
|
|
require.Equal(t, "added", rmap["type"].(string))
|
|
req := rmap["notaryrequest"].(map[string]any)
|
|
mainTx := req["maintx"].(map[string]any)
|
|
signers := mainTx["signers"].([]any)
|
|
signer0 := signers[0].(map[string]any)
|
|
signer0acc := signer0["account"].(string)
|
|
require.Equal(t, "0x"+goodSender.StringLE(), signer0acc)
|
|
},
|
|
},
|
|
"matching type": {
|
|
params: `["notary_request_event", {"type":"added"}]`,
|
|
check: func(t *testing.T, resp *neorpc.Notification) {
|
|
require.Equal(t, neorpc.NotaryRequestEventID, resp.Event)
|
|
rmap := resp.Payload[0].(map[string]any)
|
|
require.Equal(t, "added", rmap["type"].(string))
|
|
},
|
|
},
|
|
"matching sender, signer and type": {
|
|
params: `["notary_request_event", {"sender":"` + goodSender.StringLE() + `", "signer":"` + goodSender.StringLE() + `","type":"added"}]`,
|
|
check: func(t *testing.T, resp *neorpc.Notification) {
|
|
rmap := resp.Payload[0].(map[string]any)
|
|
require.Equal(t, neorpc.NotaryRequestEventID, resp.Event)
|
|
require.Equal(t, "added", rmap["type"].(string))
|
|
req := rmap["notaryrequest"].(map[string]any)
|
|
mainTx := req["maintx"].(map[string]any)
|
|
fbTx := req["fallbacktx"].(map[string]any)
|
|
sender := fbTx["signers"].([]any)[1].(map[string]any)["account"].(string)
|
|
require.Equal(t, "0x"+goodSender.StringLE(), sender)
|
|
signers := mainTx["signers"].([]any)
|
|
signer0 := signers[0].(map[string]any)
|
|
signer0acc := signer0["account"].(string)
|
|
require.Equal(t, "0x"+goodSender.StringLE(), signer0acc)
|
|
},
|
|
},
|
|
}
|
|
|
|
chain, rpcSrv, c, respMsgs := initCleanServerAndWSClient(t, true)
|
|
|
|
// blocks are needed to make GAS deposit for priv0
|
|
blocks := getTestBlocks(t)
|
|
for _, b := range blocks {
|
|
require.NoError(t, chain.AddBlock(b))
|
|
}
|
|
|
|
var nonce uint32 = 100
|
|
for name, this := range cases {
|
|
t.Run(name, func(t *testing.T) {
|
|
subID := callSubscribe(t, c, respMsgs, this.params)
|
|
|
|
err := rpcSrv.coreServer.RelayP2PNotaryRequest(createValidNotaryRequest(chain, priv0, nonce, 2_0000_0000, nil))
|
|
require.NoError(t, err)
|
|
nonce++
|
|
|
|
var resp = new(neorpc.Notification)
|
|
select {
|
|
case body := <-respMsgs:
|
|
require.NoError(t, json.Unmarshal(body, resp))
|
|
case <-time.After(time.Second):
|
|
t.Fatal("timeout waiting for event")
|
|
}
|
|
|
|
require.Equal(t, neorpc.NotaryRequestEventID, resp.Event)
|
|
this.check(t, resp)
|
|
|
|
callUnsubscribe(t, c, respMsgs, subID)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestFilteredBlockSubscriptions(t *testing.T) {
|
|
// We can't fit this into TestFilteredSubscriptions, because it uses
|
|
// blocks as EOF events to wait for.
|
|
const numBlocks = 10
|
|
chain, _, c, respMsgs := initCleanServerAndWSClient(t)
|
|
|
|
blockSubID := callSubscribe(t, c, respMsgs, `["block_added", {"primary":3}]`)
|
|
|
|
var expectedCnt int
|
|
for i := 0; i < numBlocks; i++ {
|
|
primary := uint32(i % 4)
|
|
if primary == 3 {
|
|
expectedCnt++
|
|
}
|
|
b := testchain.NewBlock(t, chain, 1, primary)
|
|
require.NoError(t, chain.AddBlock(b))
|
|
}
|
|
|
|
for i := 0; i < expectedCnt; i++ {
|
|
var resp = new(neorpc.Notification)
|
|
select {
|
|
case body := <-respMsgs:
|
|
require.NoError(t, json.Unmarshal(body, resp))
|
|
case <-time.After(time.Second):
|
|
t.Fatal("timeout waiting for event")
|
|
}
|
|
|
|
require.Equal(t, neorpc.BlockEventID, resp.Event)
|
|
rmap := resp.Payload[0].(map[string]any)
|
|
primary := rmap["primary"].(float64)
|
|
require.Equal(t, 3, int(primary))
|
|
}
|
|
callUnsubscribe(t, c, respMsgs, blockSubID)
|
|
}
|
|
|
|
func TestHeaderOfAddedBlockSubscriptions(t *testing.T) {
|
|
const numBlocks = 10
|
|
chain, _, c, respMsgs := initCleanServerAndWSClient(t)
|
|
|
|
headerSubID := callSubscribe(t, c, respMsgs, `["header_of_added_block", {"primary":3}]`)
|
|
|
|
var expectedCnt int
|
|
for i := 0; i < numBlocks; i++ {
|
|
primary := uint32(i % 4)
|
|
if primary == 3 {
|
|
expectedCnt++
|
|
}
|
|
b := testchain.NewBlock(t, chain, 1, primary)
|
|
require.NoError(t, chain.AddBlock(b))
|
|
}
|
|
|
|
for i := 0; i < expectedCnt; i++ {
|
|
var resp = new(neorpc.Notification)
|
|
select {
|
|
case body := <-respMsgs:
|
|
require.NoError(t, json.Unmarshal(body, resp))
|
|
case <-time.After(time.Second):
|
|
t.Fatal("timeout waiting for event")
|
|
}
|
|
|
|
require.Equal(t, neorpc.HeaderOfAddedBlockEventID, resp.Event)
|
|
rmap := resp.Payload[0].(map[string]any)
|
|
primary := rmap["primary"].(float64)
|
|
require.Equal(t, 3, int(primary))
|
|
}
|
|
callUnsubscribe(t, c, respMsgs, headerSubID)
|
|
}
|
|
|
|
func TestMaxSubscriptions(t *testing.T) {
|
|
var subIDs = make([]string, 0)
|
|
_, _, c, respMsgs := initCleanServerAndWSClient(t)
|
|
|
|
for i := 0; i < maxFeeds+1; i++ {
|
|
var s string
|
|
resp := callWSGetRaw(t, c, `{"jsonrpc": "2.0", "method": "subscribe", "params": ["block_added"], "id": 1}`, respMsgs)
|
|
if i < maxFeeds {
|
|
require.Nil(t, resp.Error)
|
|
require.NotNil(t, resp.Result)
|
|
require.NoError(t, json.Unmarshal(resp.Result, &s))
|
|
// Each ID must be unique.
|
|
for _, id := range subIDs {
|
|
require.NotEqual(t, id, s)
|
|
}
|
|
subIDs = append(subIDs, s)
|
|
} else {
|
|
require.NotNil(t, resp.Error)
|
|
require.Nil(t, resp.Result)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestBadSubUnsub(t *testing.T) {
|
|
var subCases = map[string]string{
|
|
"no params": `{"jsonrpc": "2.0", "method": "subscribe", "params": [], "id": 1}`,
|
|
"bad (non-string) event": `{"jsonrpc": "2.0", "method": "subscribe", "params": [1], "id": 1}`,
|
|
"bad (wrong) event": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["block_removed"], "id": 1}`,
|
|
"missed event": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["event_missed"], "id": 1}`,
|
|
"block invalid filter": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["block_added", 1], "id": 1}`,
|
|
"tx filter 1": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["transaction_added", 1], "id": 1}`,
|
|
"tx filter 2": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["transaction_added", {"state": "HALT"}], "id": 1}`,
|
|
"notification filter 1": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["notification_from_execution", "contract"], "id": 1}`,
|
|
"notification filter 2": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["notification_from_execution", "name"], "id": 1}`,
|
|
"execution filter 1": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["transaction_executed", "FAULT"], "id": 1}`,
|
|
"execution filter 2": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["transaction_executed", {"state": "STOP"}], "id": 1}`,
|
|
}
|
|
var unsubCases = map[string]string{
|
|
"no params": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": [], "id": 1}`,
|
|
"bad id": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": ["vasiliy"], "id": 1}`,
|
|
"not subscribed id": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": ["7"], "id": 1}`,
|
|
}
|
|
_, _, c, respMsgs := initCleanServerAndWSClient(t)
|
|
|
|
testF := func(t *testing.T, cases map[string]string) func(t *testing.T) {
|
|
return func(t *testing.T) {
|
|
for n, s := range cases {
|
|
t.Run(n, func(t *testing.T) {
|
|
resp := callWSGetRaw(t, c, s, respMsgs)
|
|
require.NotNil(t, resp.Error)
|
|
require.Nil(t, resp.Result)
|
|
})
|
|
}
|
|
}
|
|
}
|
|
t.Run("subscribe", testF(t, subCases))
|
|
t.Run("unsubscribe", testF(t, unsubCases))
|
|
}
|
|
|
|
func doSomeWSRequest(t *testing.T, ws *websocket.Conn) {
|
|
require.NoError(t, ws.SetWriteDeadline(time.Now().Add(5*time.Second)))
|
|
// It could be just about anything including invalid request,
|
|
// we only care about server handling being active.
|
|
require.NoError(t, ws.WriteMessage(websocket.TextMessage, []byte(`{"jsonrpc": "2.0", "method": "getversion", "params": [], "id": 1}`)))
|
|
err := ws.SetReadDeadline(time.Now().Add(5 * time.Second))
|
|
require.NoError(t, err)
|
|
_, _, err = ws.ReadMessage()
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
func TestWSClientsLimit(t *testing.T) {
|
|
for tname, limit := range map[string]int{"default": 0, "8": 8, "disabled": -1} {
|
|
effectiveClients := limit
|
|
if limit == 0 {
|
|
effectiveClients = defaultMaxWebSocketClients
|
|
} else if limit < 0 {
|
|
effectiveClients = 0
|
|
}
|
|
t.Run(tname, func(t *testing.T) {
|
|
_, _, httpSrv := initClearServerWithCustomConfig(t, func(cfg *config.Config) {
|
|
cfg.ApplicationConfiguration.RPC.MaxWebSocketClients = limit
|
|
})
|
|
|
|
dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second}
|
|
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
|
|
wss := make([]*websocket.Conn, effectiveClients)
|
|
|
|
for i := 0; i < len(wss)+1; i++ {
|
|
ws, r, err := dialer.Dial(url, nil)
|
|
if r != nil && r.Body != nil {
|
|
defer r.Body.Close()
|
|
}
|
|
if i < effectiveClients {
|
|
require.NoError(t, err)
|
|
wss[i] = ws
|
|
// Check that it's completely ready.
|
|
doSomeWSRequest(t, ws)
|
|
} else {
|
|
require.Error(t, err)
|
|
}
|
|
}
|
|
// Check connections are still alive (it actually is necessary to add
|
|
// some use of wss to keep connections alive).
|
|
for i := 0; i < len(wss); i++ {
|
|
doSomeWSRequest(t, wss[i])
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// The purpose of this test is to overflow buffers on server side to
|
|
// receive a 'missed' event. But it's actually hard to tell when exactly
|
|
// that's going to happen because of network-level buffering, typical
|
|
// number seen in tests is around ~3500 events, but it's not reliable enough,
|
|
// thus this test is disabled.
|
|
func TestSubscriptionOverflow(t *testing.T) {
|
|
if !testOverflow {
|
|
return
|
|
}
|
|
const blockCnt = notificationBufSize * 5
|
|
var receivedMiss bool
|
|
|
|
chain, _, c, respMsgs := initCleanServerAndWSClient(t)
|
|
|
|
resp := callWSGetRaw(t, c, `{"jsonrpc": "2.0","method": "subscribe","params": ["block_added"],"id": 1}`, respMsgs)
|
|
require.Nil(t, resp.Error)
|
|
require.NotNil(t, resp.Result)
|
|
|
|
// Push a lot of new blocks, but don't read events for them.
|
|
for i := 0; i < blockCnt; i++ {
|
|
b := testchain.NewBlock(t, chain, 1, 0)
|
|
require.NoError(t, chain.AddBlock(b))
|
|
}
|
|
for i := 0; i < blockCnt; i++ {
|
|
resp := getNotification(t, respMsgs)
|
|
if resp.Event != neorpc.BlockEventID {
|
|
require.Equal(t, neorpc.MissedEventID, resp.Event)
|
|
receivedMiss = true
|
|
break
|
|
}
|
|
}
|
|
require.Equal(t, true, receivedMiss)
|
|
// `Missed` is the last event and there is nothing afterwards.
|
|
require.Equal(t, 0, len(respMsgs))
|
|
}
|
|
|
|
func TestFilteredSubscriptions_InvalidFilter(t *testing.T) {
|
|
var cases = map[string]struct {
|
|
params string
|
|
}{
|
|
"notification with long name": {
|
|
params: `["notification_from_execution", {"name":"notification_from_execution_with_long_name"}]`,
|
|
},
|
|
"execution with invalid vm state": {
|
|
params: `["transaction_executed", {"state":"NOTHALT"}]`,
|
|
},
|
|
}
|
|
_, _, c, respMsgs := initCleanServerAndWSClient(t)
|
|
|
|
for name, this := range cases {
|
|
t.Run(name, func(t *testing.T) {
|
|
resp := callWSGetRaw(t, c, fmt.Sprintf(`{"jsonrpc": "2.0","method": "subscribe","params": %s,"id": 1}`, this.params), respMsgs)
|
|
require.NotNil(t, resp.Error)
|
|
require.Nil(t, resp.Result)
|
|
require.Contains(t, resp.Error.Error(), neorpc.ErrInvalidSubscriptionFilter.Error())
|
|
})
|
|
}
|
|
}
|