neo-go/pkg/services/rpcsrv/subscription_test.go

702 lines
23 KiB
Go
Raw Normal View History

package rpcsrv
import (
"encoding/json"
"fmt"
"strings"
"sync"
"testing"
"time"
"github.com/gorilla/websocket"
"github.com/nspcc-dev/neo-go/internal/testchain"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/stretchr/testify/require"
)
2021-05-12 15:20:48 +00:00
const testOverflow = false
services: refactor test WS reader 1. Replace isFinished atomic variable with a channel, no functional changes here, just use more common way as all our services do. 2. Do not check erors from SetReadDeadline and ReadMessage on exit. It seems to be not quite right because connection is not closed by this moment, and thus, these error checks are racy. 3. Add read timeout for the message reader. It is needed because some tests may leave message unread in the end which results in hanging test cleanup. 4. Add drain loop to message reader in order not to block WS reader on sending message. Ref. https://github.com/nspcc-dev/neo-go/pull/3392#issuecomment-2031590403. It's clear that TestBadSubUnsub is hanging on test cleanup, in particular, on attempt to wait for wsRead routine to exit. The only place where wsRead routine may hang is sending to msgCh in case if receiver is not going to read from this channel: ``` 2024-04-02T08:14:51.4957621Z goroutine 14329 [chan receive]: 2024-04-02T08:14:51.4958010Z github.com/nspcc-dev/neo-go/pkg/services/rpcsrv.initCleanServerAndWSClient.func1() 2024-04-02T08:14:51.4958344Z D:/a/neo-go/neo-go/pkg/services/rpcsrv/subscription_test.go:80 +0x71 2024-04-02T08:14:51.4958457Z testing.(*common).Cleanup.func1() 2024-04-02T08:14:51.4958757Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1175 +0x17a 2024-04-02T08:14:51.4958903Z testing.(*common).runCleanup(0xc002cf5860, 0x0) 2024-04-02T08:14:51.4959193Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1353 +0x262 2024-04-02T08:14:51.4959291Z testing.tRunner.func2() 2024-04-02T08:14:51.4959566Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1683 +0x51 2024-04-02T08:14:51.4959695Z testing.tRunner(0xc002cf5860, 0x141687410) 2024-04-02T08:14:51.4959976Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1695 +0x25e 2024-04-02T08:14:51.4960115Z created by testing.(*T).Run in goroutine 1 2024-04-02T08:14:51.4960385Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1742 +0x826 ``` Signed-off-by: Anna Shaleva <shaleva.ann@nspcc.ru>
2024-04-03 09:29:04 +00:00
func wsReader(t *testing.T, ws *websocket.Conn, msgCh chan<- []byte, readerStopCh chan struct{}, readerToExitCh chan struct{}) {
readLoop:
for {
select {
case <-readerStopCh:
break readLoop
default:
err := ws.SetReadDeadline(time.Now().Add(5 * time.Second))
select {
case <-readerStopCh:
break readLoop
default:
require.NoError(t, err)
}
_, body, err := ws.ReadMessage()
select {
case <-readerStopCh:
break readLoop
default:
require.NoError(t, err)
}
select {
case msgCh <- body:
case <-time.After(10 * time.Second):
t.Log("exiting wsReader loop: unable to send response to receiver")
break readLoop
}
}
}
close(readerToExitCh)
}
func callWSGetRaw(t *testing.T, ws *websocket.Conn, msg string, respCh <-chan []byte) *neorpc.Response {
var resp = new(neorpc.Response)
require.NoError(t, ws.SetWriteDeadline(time.Now().Add(5*time.Second)))
require.NoError(t, ws.WriteMessage(websocket.TextMessage, []byte(msg)))
body := <-respCh
require.NoError(t, json.Unmarshal(body, resp))
return resp
}
func getNotification(t *testing.T, respCh <-chan []byte) *neorpc.Notification {
var resp = new(neorpc.Notification)
body := <-respCh
require.NoError(t, json.Unmarshal(body, resp))
return resp
}
func initCleanServerAndWSClient(t *testing.T, startNetworkServer ...bool) (*core.Blockchain, *Server, *websocket.Conn, chan []byte) {
chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t)
dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second}
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
ws, r, err := dialer.Dial(url, nil)
require.NoError(t, err)
defer r.Body.Close()
// Use buffered channel to read server's messages and then read expected
// responses from it.
respMsgs := make(chan []byte, 16)
services: refactor test WS reader 1. Replace isFinished atomic variable with a channel, no functional changes here, just use more common way as all our services do. 2. Do not check erors from SetReadDeadline and ReadMessage on exit. It seems to be not quite right because connection is not closed by this moment, and thus, these error checks are racy. 3. Add read timeout for the message reader. It is needed because some tests may leave message unread in the end which results in hanging test cleanup. 4. Add drain loop to message reader in order not to block WS reader on sending message. Ref. https://github.com/nspcc-dev/neo-go/pull/3392#issuecomment-2031590403. It's clear that TestBadSubUnsub is hanging on test cleanup, in particular, on attempt to wait for wsRead routine to exit. The only place where wsRead routine may hang is sending to msgCh in case if receiver is not going to read from this channel: ``` 2024-04-02T08:14:51.4957621Z goroutine 14329 [chan receive]: 2024-04-02T08:14:51.4958010Z github.com/nspcc-dev/neo-go/pkg/services/rpcsrv.initCleanServerAndWSClient.func1() 2024-04-02T08:14:51.4958344Z D:/a/neo-go/neo-go/pkg/services/rpcsrv/subscription_test.go:80 +0x71 2024-04-02T08:14:51.4958457Z testing.(*common).Cleanup.func1() 2024-04-02T08:14:51.4958757Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1175 +0x17a 2024-04-02T08:14:51.4958903Z testing.(*common).runCleanup(0xc002cf5860, 0x0) 2024-04-02T08:14:51.4959193Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1353 +0x262 2024-04-02T08:14:51.4959291Z testing.tRunner.func2() 2024-04-02T08:14:51.4959566Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1683 +0x51 2024-04-02T08:14:51.4959695Z testing.tRunner(0xc002cf5860, 0x141687410) 2024-04-02T08:14:51.4959976Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1695 +0x25e 2024-04-02T08:14:51.4960115Z created by testing.(*T).Run in goroutine 1 2024-04-02T08:14:51.4960385Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1742 +0x826 ``` Signed-off-by: Anna Shaleva <shaleva.ann@nspcc.ru>
2024-04-03 09:29:04 +00:00
readerStopCh := make(chan struct{})
readerToExitCh := make(chan struct{})
services: refactor test WS reader 1. Replace isFinished atomic variable with a channel, no functional changes here, just use more common way as all our services do. 2. Do not check erors from SetReadDeadline and ReadMessage on exit. It seems to be not quite right because connection is not closed by this moment, and thus, these error checks are racy. 3. Add read timeout for the message reader. It is needed because some tests may leave message unread in the end which results in hanging test cleanup. 4. Add drain loop to message reader in order not to block WS reader on sending message. Ref. https://github.com/nspcc-dev/neo-go/pull/3392#issuecomment-2031590403. It's clear that TestBadSubUnsub is hanging on test cleanup, in particular, on attempt to wait for wsRead routine to exit. The only place where wsRead routine may hang is sending to msgCh in case if receiver is not going to read from this channel: ``` 2024-04-02T08:14:51.4957621Z goroutine 14329 [chan receive]: 2024-04-02T08:14:51.4958010Z github.com/nspcc-dev/neo-go/pkg/services/rpcsrv.initCleanServerAndWSClient.func1() 2024-04-02T08:14:51.4958344Z D:/a/neo-go/neo-go/pkg/services/rpcsrv/subscription_test.go:80 +0x71 2024-04-02T08:14:51.4958457Z testing.(*common).Cleanup.func1() 2024-04-02T08:14:51.4958757Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1175 +0x17a 2024-04-02T08:14:51.4958903Z testing.(*common).runCleanup(0xc002cf5860, 0x0) 2024-04-02T08:14:51.4959193Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1353 +0x262 2024-04-02T08:14:51.4959291Z testing.tRunner.func2() 2024-04-02T08:14:51.4959566Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1683 +0x51 2024-04-02T08:14:51.4959695Z testing.tRunner(0xc002cf5860, 0x141687410) 2024-04-02T08:14:51.4959976Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1695 +0x25e 2024-04-02T08:14:51.4960115Z created by testing.(*T).Run in goroutine 1 2024-04-02T08:14:51.4960385Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1742 +0x826 ``` Signed-off-by: Anna Shaleva <shaleva.ann@nspcc.ru>
2024-04-03 09:29:04 +00:00
go wsReader(t, ws, respMsgs, readerStopCh, readerToExitCh)
if len(startNetworkServer) != 0 && startNetworkServer[0] {
rpcSrv.coreServer.Start()
}
t.Cleanup(func() {
services: refactor test WS reader 1. Replace isFinished atomic variable with a channel, no functional changes here, just use more common way as all our services do. 2. Do not check erors from SetReadDeadline and ReadMessage on exit. It seems to be not quite right because connection is not closed by this moment, and thus, these error checks are racy. 3. Add read timeout for the message reader. It is needed because some tests may leave message unread in the end which results in hanging test cleanup. 4. Add drain loop to message reader in order not to block WS reader on sending message. Ref. https://github.com/nspcc-dev/neo-go/pull/3392#issuecomment-2031590403. It's clear that TestBadSubUnsub is hanging on test cleanup, in particular, on attempt to wait for wsRead routine to exit. The only place where wsRead routine may hang is sending to msgCh in case if receiver is not going to read from this channel: ``` 2024-04-02T08:14:51.4957621Z goroutine 14329 [chan receive]: 2024-04-02T08:14:51.4958010Z github.com/nspcc-dev/neo-go/pkg/services/rpcsrv.initCleanServerAndWSClient.func1() 2024-04-02T08:14:51.4958344Z D:/a/neo-go/neo-go/pkg/services/rpcsrv/subscription_test.go:80 +0x71 2024-04-02T08:14:51.4958457Z testing.(*common).Cleanup.func1() 2024-04-02T08:14:51.4958757Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1175 +0x17a 2024-04-02T08:14:51.4958903Z testing.(*common).runCleanup(0xc002cf5860, 0x0) 2024-04-02T08:14:51.4959193Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1353 +0x262 2024-04-02T08:14:51.4959291Z testing.tRunner.func2() 2024-04-02T08:14:51.4959566Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1683 +0x51 2024-04-02T08:14:51.4959695Z testing.tRunner(0xc002cf5860, 0x141687410) 2024-04-02T08:14:51.4959976Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1695 +0x25e 2024-04-02T08:14:51.4960115Z created by testing.(*T).Run in goroutine 1 2024-04-02T08:14:51.4960385Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/testing/testing.go:1742 +0x826 ``` Signed-off-by: Anna Shaleva <shaleva.ann@nspcc.ru>
2024-04-03 09:29:04 +00:00
drainLoop:
for {
select {
case <-respMsgs:
default:
break drainLoop
}
}
close(readerStopCh)
ws.Close()
<-readerToExitCh
if len(startNetworkServer) != 0 && startNetworkServer[0] {
rpcSrv.coreServer.Shutdown()
}
})
return chain, rpcSrv, ws, respMsgs
}
func callSubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, params string) string {
var s string
resp := callWSGetRaw(t, ws, fmt.Sprintf(`{"jsonrpc": "2.0","method": "subscribe","params": %s,"id": 1}`, params), msgs)
require.Nil(t, resp.Error)
require.NotNil(t, resp.Result)
require.NoError(t, json.Unmarshal(resp.Result, &s))
return s
}
func callUnsubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, id string) {
var b bool
resp := callWSGetRaw(t, ws, fmt.Sprintf(`{"jsonrpc": "2.0","method": "unsubscribe","params": ["%s"],"id": 1}`, id), msgs)
require.Nil(t, resp.Error)
require.NotNil(t, resp.Result)
require.NoError(t, json.Unmarshal(resp.Result, &b))
require.Equal(t, true, b)
}
func TestSubscriptions(t *testing.T) {
var subIDs = make([]string, 0)
var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event", "header_of_added_block"}
chain, rpcSrv, c, respMsgs := initCleanServerAndWSClient(t, true)
for _, feed := range subFeeds {
s := callSubscribe(t, c, respMsgs, fmt.Sprintf(`["%s"]`, feed))
subIDs = append(subIDs, s)
}
for _, b := range getTestBlocks(t) {
require.NoError(t, chain.AddBlock(b))
resp := getNotification(t, respMsgs)
require.Equal(t, neorpc.ExecutionEventID, resp.Event)
for {
resp = getNotification(t, respMsgs)
if resp.Event != neorpc.NotificationEventID {
break
}
}
for i := 0; i < len(b.Transactions); i++ {
if i > 0 {
resp = getNotification(t, respMsgs)
}
require.Equal(t, neorpc.ExecutionEventID, resp.Event)
for {
resp := getNotification(t, respMsgs)
if resp.Event == neorpc.NotificationEventID {
continue
}
require.Equal(t, neorpc.TransactionEventID, resp.Event)
break
}
}
resp = getNotification(t, respMsgs)
require.Equal(t, neorpc.ExecutionEventID, resp.Event)
for {
resp = getNotification(t, respMsgs)
if resp.Event != neorpc.NotificationEventID {
break
}
}
require.Equal(t, neorpc.HeaderOfAddedBlockEventID, resp.Event)
resp = getNotification(t, respMsgs)
require.Equal(t, neorpc.BlockEventID, resp.Event)
}
// We should manually add NotaryRequest to test notification.
sender := testchain.PrivateKeyByID(0)
err := rpcSrv.coreServer.RelayP2PNotaryRequest(createValidNotaryRequest(chain, sender, 1, 2_0000_0000, nil))
require.NoError(t, err)
for {
resp := getNotification(t, respMsgs)
if resp.Event == neorpc.NotaryRequestEventID {
break
}
}
for _, id := range subIDs {
callUnsubscribe(t, c, respMsgs, id)
}
}
func TestFilteredSubscriptions(t *testing.T) {
priv0 := testchain.PrivateKeyByID(0)
var goodSender = priv0.GetScriptHash()
var cases = map[string]struct {
params string
check func(*testing.T, *neorpc.Notification)
}{
"tx matching sender": {
params: `["transaction_added", {"sender":"` + goodSender.StringLE() + `"}]`,
check: func(t *testing.T, resp *neorpc.Notification) {
rmap := resp.Payload[0].(map[string]any)
require.Equal(t, neorpc.TransactionEventID, resp.Event)
sender := rmap["sender"].(string)
require.Equal(t, address.Uint160ToString(goodSender), sender)
},
},
"tx matching signer": {
params: `["transaction_added", {"signer":"` + goodSender.StringLE() + `"}]`,
check: func(t *testing.T, resp *neorpc.Notification) {
rmap := resp.Payload[0].(map[string]any)
require.Equal(t, neorpc.TransactionEventID, resp.Event)
signers := rmap["signers"].([]any)
signer0 := signers[0].(map[string]any)
signer0acc := signer0["account"].(string)
require.Equal(t, "0x"+goodSender.StringLE(), signer0acc)
},
},
"tx matching sender and signer": {
params: `["transaction_added", {"sender":"` + goodSender.StringLE() + `", "signer":"` + goodSender.StringLE() + `"}]`,
check: func(t *testing.T, resp *neorpc.Notification) {
rmap := resp.Payload[0].(map[string]any)
require.Equal(t, neorpc.TransactionEventID, resp.Event)
sender := rmap["sender"].(string)
require.Equal(t, address.Uint160ToString(goodSender), sender)
signers := rmap["signers"].([]any)
signer0 := signers[0].(map[string]any)
signer0acc := signer0["account"].(string)
require.Equal(t, "0x"+goodSender.StringLE(), signer0acc)
},
},
"notification matching contract hash": {
params: `["notification_from_execution", {"contract":"` + testContractHash + `"}]`,
check: func(t *testing.T, resp *neorpc.Notification) {
rmap := resp.Payload[0].(map[string]any)
require.Equal(t, neorpc.NotificationEventID, resp.Event)
c := rmap["contract"].(string)
require.Equal(t, "0x"+testContractHash, c)
},
},
"notification matching name": {
params: `["notification_from_execution", {"name":"my_pretty_notification"}]`,
check: func(t *testing.T, resp *neorpc.Notification) {
rmap := resp.Payload[0].(map[string]any)
require.Equal(t, neorpc.NotificationEventID, resp.Event)
n := rmap["name"].(string)
require.Equal(t, "my_pretty_notification", n)
},
},
"notification matching contract hash and name": {
params: `["notification_from_execution", {"contract":"` + testContractHash + `", "name":"my_pretty_notification"}]`,
check: func(t *testing.T, resp *neorpc.Notification) {
rmap := resp.Payload[0].(map[string]any)
require.Equal(t, neorpc.NotificationEventID, resp.Event)
c := rmap["contract"].(string)
require.Equal(t, "0x"+testContractHash, c)
n := rmap["name"].(string)
require.Equal(t, "my_pretty_notification", n)
},
},
"execution matching state": {
params: `["transaction_executed", {"state":"HALT"}]`,
check: func(t *testing.T, resp *neorpc.Notification) {
rmap := resp.Payload[0].(map[string]any)
require.Equal(t, neorpc.ExecutionEventID, resp.Event)
st := rmap["vmstate"].(string)
require.Equal(t, "HALT", st)
},
},
"execution matching container": {
params: `["transaction_executed", {"container":"` + deploymentTxHash + `"}]`,
check: func(t *testing.T, resp *neorpc.Notification) {
rmap := resp.Payload[0].(map[string]any)
require.Equal(t, neorpc.ExecutionEventID, resp.Event)
tx := rmap["container"].(string)
require.Equal(t, "0x"+deploymentTxHash, tx)
},
},
"execution matching state and container": {
params: `["transaction_executed", {"state":"HALT", "container":"` + deploymentTxHash + `"}]`,
check: func(t *testing.T, resp *neorpc.Notification) {
rmap := resp.Payload[0].(map[string]any)
require.Equal(t, neorpc.ExecutionEventID, resp.Event)
tx := rmap["container"].(string)
require.Equal(t, "0x"+deploymentTxHash, tx)
st := rmap["vmstate"].(string)
require.Equal(t, "HALT", st)
},
},
"tx non-matching": {
params: `["transaction_added", {"sender":"00112233445566778899aabbccddeeff00112233"}]`,
check: func(t *testing.T, _ *neorpc.Notification) {
t.Fatal("unexpected match for EnrollmentTransaction")
},
},
"notification non-matching": {
params: `["notification_from_execution", {"contract":"00112233445566778899aabbccddeeff00112233"}]`,
check: func(t *testing.T, _ *neorpc.Notification) {
t.Fatal("unexpected match for contract 00112233445566778899aabbccddeeff00112233")
},
},
"execution non-matching": {
// We have single FAULTed transaction in chain, this, use the wrong hash for this test instead of FAULT state.
params: `["transaction_executed", {"container":"0x` + util.Uint256{}.StringLE() + `"}]`,
check: func(t *testing.T, n *neorpc.Notification) {
t.Fatal("unexpected match for faulted execution")
},
},
"header of added block": {
params: `["header_of_added_block", {"primary": 0, "since": 5}]`,
check: func(t *testing.T, resp *neorpc.Notification) {
rmap := resp.Payload[0].(map[string]any)
require.Equal(t, neorpc.HeaderOfAddedBlockEventID, resp.Event)
primary := rmap["primary"].(float64)
require.Equal(t, 0, int(primary))
index := rmap["index"].(float64)
require.Less(t, 4, int(index))
},
},
}
for name, this := range cases {
t.Run(name, func(t *testing.T) {
chain, _, c, respMsgs := initCleanServerAndWSClient(t)
// It's used as an end-of-event-stream, so it's always present.
blockSubID := callSubscribe(t, c, respMsgs, `["block_added"]`)
subID := callSubscribe(t, c, respMsgs, this.params)
var lastBlock uint32
for _, b := range getTestBlocks(t) {
require.NoError(t, chain.AddBlock(b))
lastBlock = b.Index
}
for {
resp := getNotification(t, respMsgs)
rmap := resp.Payload[0].(map[string]any)
if resp.Event == neorpc.BlockEventID {
index := rmap["index"].(float64)
if uint32(index) == lastBlock {
break
}
continue
}
this.check(t, resp)
}
callUnsubscribe(t, c, respMsgs, subID)
callUnsubscribe(t, c, respMsgs, blockSubID)
})
}
}
func TestFilteredNotaryRequestSubscriptions(t *testing.T) {
// We can't fit this into TestFilteredSubscriptions, because notary requests
// event doesn't depend on blocks events.
priv0 := testchain.PrivateKeyByID(0)
var goodSender = priv0.GetScriptHash()
var cases = map[string]struct {
params string
check func(*testing.T, *neorpc.Notification)
}{
"matching sender": {
params: `["notary_request_event", {"sender":"` + goodSender.StringLE() + `"}]`,
check: func(t *testing.T, resp *neorpc.Notification) {
rmap := resp.Payload[0].(map[string]any)
require.Equal(t, neorpc.NotaryRequestEventID, resp.Event)
require.Equal(t, "added", rmap["type"].(string))
req := rmap["notaryrequest"].(map[string]any)
fbTx := req["fallbacktx"].(map[string]any)
sender := fbTx["signers"].([]any)[1].(map[string]any)["account"].(string)
require.Equal(t, "0x"+goodSender.StringLE(), sender)
},
},
"matching signer": {
params: `["notary_request_event", {"signer":"` + goodSender.StringLE() + `"}]`,
check: func(t *testing.T, resp *neorpc.Notification) {
rmap := resp.Payload[0].(map[string]any)
require.Equal(t, neorpc.NotaryRequestEventID, resp.Event)
require.Equal(t, "added", rmap["type"].(string))
req := rmap["notaryrequest"].(map[string]any)
mainTx := req["maintx"].(map[string]any)
signers := mainTx["signers"].([]any)
signer0 := signers[0].(map[string]any)
signer0acc := signer0["account"].(string)
require.Equal(t, "0x"+goodSender.StringLE(), signer0acc)
},
},
"matching type": {
params: `["notary_request_event", {"type":"added"}]`,
check: func(t *testing.T, resp *neorpc.Notification) {
require.Equal(t, neorpc.NotaryRequestEventID, resp.Event)
rmap := resp.Payload[0].(map[string]any)
require.Equal(t, "added", rmap["type"].(string))
},
},
"matching sender, signer and type": {
params: `["notary_request_event", {"sender":"` + goodSender.StringLE() + `", "signer":"` + goodSender.StringLE() + `","type":"added"}]`,
check: func(t *testing.T, resp *neorpc.Notification) {
rmap := resp.Payload[0].(map[string]any)
require.Equal(t, neorpc.NotaryRequestEventID, resp.Event)
require.Equal(t, "added", rmap["type"].(string))
req := rmap["notaryrequest"].(map[string]any)
mainTx := req["maintx"].(map[string]any)
fbTx := req["fallbacktx"].(map[string]any)
sender := fbTx["signers"].([]any)[1].(map[string]any)["account"].(string)
require.Equal(t, "0x"+goodSender.StringLE(), sender)
signers := mainTx["signers"].([]any)
signer0 := signers[0].(map[string]any)
signer0acc := signer0["account"].(string)
require.Equal(t, "0x"+goodSender.StringLE(), signer0acc)
},
},
}
chain, rpcSrv, c, respMsgs := initCleanServerAndWSClient(t, true)
// blocks are needed to make GAS deposit for priv0
blocks := getTestBlocks(t)
for _, b := range blocks {
require.NoError(t, chain.AddBlock(b))
}
var nonce uint32 = 100
for name, this := range cases {
t.Run(name, func(t *testing.T) {
subID := callSubscribe(t, c, respMsgs, this.params)
err := rpcSrv.coreServer.RelayP2PNotaryRequest(createValidNotaryRequest(chain, priv0, nonce, 2_0000_0000, nil))
require.NoError(t, err)
nonce++
var resp = new(neorpc.Notification)
select {
case body := <-respMsgs:
require.NoError(t, json.Unmarshal(body, resp))
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
require.Equal(t, neorpc.NotaryRequestEventID, resp.Event)
this.check(t, resp)
callUnsubscribe(t, c, respMsgs, subID)
})
}
}
func TestFilteredBlockSubscriptions(t *testing.T) {
// We can't fit this into TestFilteredSubscriptions, because it uses
// blocks as EOF events to wait for.
const numBlocks = 10
chain, _, c, respMsgs := initCleanServerAndWSClient(t)
blockSubID := callSubscribe(t, c, respMsgs, `["block_added", {"primary":3}]`)
var expectedCnt int
for i := 0; i < numBlocks; i++ {
primary := uint32(i % 4)
if primary == 3 {
expectedCnt++
}
b := testchain.NewBlock(t, chain, 1, primary)
require.NoError(t, chain.AddBlock(b))
}
for i := 0; i < expectedCnt; i++ {
var resp = new(neorpc.Notification)
select {
case body := <-respMsgs:
require.NoError(t, json.Unmarshal(body, resp))
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
require.Equal(t, neorpc.BlockEventID, resp.Event)
rmap := resp.Payload[0].(map[string]any)
2021-03-01 12:20:27 +00:00
primary := rmap["primary"].(float64)
require.Equal(t, 3, int(primary))
}
callUnsubscribe(t, c, respMsgs, blockSubID)
}
func TestHeaderOfAddedBlockSubscriptions(t *testing.T) {
const numBlocks = 10
chain, _, c, respMsgs := initCleanServerAndWSClient(t)
headerSubID := callSubscribe(t, c, respMsgs, `["header_of_added_block", {"primary":3}]`)
var expectedCnt int
for i := 0; i < numBlocks; i++ {
primary := uint32(i % 4)
if primary == 3 {
expectedCnt++
}
b := testchain.NewBlock(t, chain, 1, primary)
require.NoError(t, chain.AddBlock(b))
}
for i := 0; i < expectedCnt; i++ {
var resp = new(neorpc.Notification)
select {
case body := <-respMsgs:
require.NoError(t, json.Unmarshal(body, resp))
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
require.Equal(t, neorpc.HeaderOfAddedBlockEventID, resp.Event)
rmap := resp.Payload[0].(map[string]any)
primary := rmap["primary"].(float64)
require.Equal(t, 3, int(primary))
}
callUnsubscribe(t, c, respMsgs, headerSubID)
}
func TestMaxSubscriptions(t *testing.T) {
var subIDs = make([]string, 0)
_, _, c, respMsgs := initCleanServerAndWSClient(t)
for i := 0; i < maxFeeds+1; i++ {
var s string
resp := callWSGetRaw(t, c, `{"jsonrpc": "2.0", "method": "subscribe", "params": ["block_added"], "id": 1}`, respMsgs)
if i < maxFeeds {
require.Nil(t, resp.Error)
require.NotNil(t, resp.Result)
require.NoError(t, json.Unmarshal(resp.Result, &s))
// Each ID must be unique.
for _, id := range subIDs {
require.NotEqual(t, id, s)
}
subIDs = append(subIDs, s)
} else {
require.NotNil(t, resp.Error)
require.Nil(t, resp.Result)
}
}
}
func TestBadSubUnsub(t *testing.T) {
var subCases = map[string]string{
"no params": `{"jsonrpc": "2.0", "method": "subscribe", "params": [], "id": 1}`,
"bad (non-string) event": `{"jsonrpc": "2.0", "method": "subscribe", "params": [1], "id": 1}`,
"bad (wrong) event": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["block_removed"], "id": 1}`,
"missed event": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["event_missed"], "id": 1}`,
"block invalid filter": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["block_added", 1], "id": 1}`,
"tx filter 1": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["transaction_added", 1], "id": 1}`,
"tx filter 2": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["transaction_added", {"state": "HALT"}], "id": 1}`,
"notification filter 1": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["notification_from_execution", "contract"], "id": 1}`,
"notification filter 2": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["notification_from_execution", "name"], "id": 1}`,
"execution filter 1": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["transaction_executed", "FAULT"], "id": 1}`,
"execution filter 2": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["transaction_executed", {"state": "STOP"}], "id": 1}`,
}
var unsubCases = map[string]string{
"no params": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": [], "id": 1}`,
"bad id": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": ["vasiliy"], "id": 1}`,
"not subscribed id": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": ["7"], "id": 1}`,
}
_, _, c, respMsgs := initCleanServerAndWSClient(t)
testF := func(t *testing.T, cases map[string]string) func(t *testing.T) {
return func(t *testing.T) {
for n, s := range cases {
t.Run(n, func(t *testing.T) {
resp := callWSGetRaw(t, c, s, respMsgs)
require.NotNil(t, resp.Error)
require.Nil(t, resp.Result)
})
}
}
}
t.Run("subscribe", testF(t, subCases))
t.Run("unsubscribe", testF(t, unsubCases))
}
func doSomeWSRequest(t *testing.T, ws *websocket.Conn) {
require.NoError(t, ws.SetWriteDeadline(time.Now().Add(5*time.Second)))
// It could be just about anything including invalid request,
// we only care about server handling being active.
require.NoError(t, ws.WriteMessage(websocket.TextMessage, []byte(`{"jsonrpc": "2.0", "method": "getversion", "params": [], "id": 1}`)))
err := ws.SetReadDeadline(time.Now().Add(5 * time.Second))
require.NoError(t, err)
_, _, err = ws.ReadMessage()
require.NoError(t, err)
}
func TestWSClientsLimit(t *testing.T) {
for tname, limit := range map[string]int{"default": 0, "8": 8, "disabled": -1} {
effectiveClients := limit
if limit == 0 {
effectiveClients = defaultMaxWebSocketClients
} else if limit < 0 {
effectiveClients = 0
}
t.Run(tname, func(t *testing.T) {
_, _, httpSrv := initClearServerWithCustomConfig(t, func(cfg *config.Config) {
cfg.ApplicationConfiguration.RPC.MaxWebSocketClients = limit
})
rpcsrv: increase timeout in TestWSClientsLimit Still not fast enough for connections to be alive for the default case of 64+1 connections. Some of them are start to die after 5 seconds of test: ``` 2024-04-03T14:06:22.5504034Z === RUN TestWSClientsLimit 2024-04-03T14:06:22.5504142Z === RUN TestWSClientsLimit/default 2024-04-03T14:06:22.5504868Z logger.go:146: 2024-04-03T14:04:30.637Z INFO initial gas supply is not set or wrong, setting default value {"InitialGASSupply": "52000000"} 2024-04-03T14:06:22.5505730Z logger.go:146: 2024-04-03T14:04:30 .637Z INFO P2PNotaryRequestPayloadPool size is not set or wrong, setting default value {"P2PNotaryRequestPayloadPoolSize": 1000} 2024-04-03T14:06:22.5506373Z logger.go:146: 2024-04-03T14:04:30 .637Z INFO MaxBlockSize is not set or wrong, setting default value {"MaxBlockSize": 262144} 2024-04-03T14:06:22.5507094Z logger.go:146: 2024-04-03T14:04:30 .637Z INFO MaxBlockSystemFee is not set or wrong, setting default value {"MaxBlockSystemFee": 900000000000} 2024-04-03T14:06:22.5507843Z logger.go:146: 2024-04-03T14:04:30 .637Z INFO MaxTransactionsPerBlock is not set or wrong, using default value {"MaxTransactionsPerBlock": 512} 2024-04-03T14:06:22.5508644Z logger.go:146: 2024-04-03T14:04:30 .637Z INFO MaxValidUntilBlockIncrement is not set or wrong, using default value {"MaxValidUntilBlockIncrement": 5760} 2024-04-03T14:06:22.5509114Z logger.go:146: 2024-04-03T14:04:30 .637Z INFO no storage version found! creating genesis block 2024-04-03T14:06:22.5509788Z logger.go:146: 2024-04-03T14:04:30 .637Z INFO ExtensiblePoolSize is not set or wrong, using default value {"ExtensiblePoolSize": 20} 2024-04-03T14:06:22.5510476Z logger.go:146: 2024-04-03T14:04:30 .637Z INFO SessionPoolSize is not set or wrong, setting default value {"SessionPoolSize": 20} 2024-04-03T14:06:22.5511258Z logger.go:146: 2024-04-03T14:04:30 .637Z INFO MaxIteratorResultItems is not set or wrong, setting default value {"MaxIteratorResultItems": 100} 2024-04-03T14:06:22.5511951Z logger.go:146: 2024-04-03T14:04:30 .637Z INFO MaxFindResultItems is not set or wrong, setting default value {"MaxFindResultItems": 100} 2024-04-03T14:06:22.5512598Z logger.go:146: 2024-04-03T14:04:30 .637Z INFO MaxNEP11Tokens is not set or wrong, setting default value {"MaxNEP11Tokens": 100} 2024-04-03T14:06:22.5513316Z logger.go:146: 2024-04-03T14:04:30 .637Z INFO MaxRequestBodyBytes is not set or wong, setting default value {"MaxRequestBodyBytes": 5242880} 2024-04-03T14:06:22.5514060Z logger.go:146: 2024-04-03T14:04:30 .637Z INFO MaxRequestHeaderBytes is not set or wong, setting default value {"MaxRequestHeaderBytes": 1048576} 2024-04-03T14:06:22.5514755Z logger.go:146: 2024-04-03T14:04:30 .637Z INFO MaxWebSocketClients is not set or wrong, setting default value {"MaxWebSocketClients": 64} 2024-04-03T14:06:22.5515215Z logger.go:146: 2024-04-03T14:04:30 .637Z INFO starting rpc-server {"endpoint": "127.0.0.1:0"} 2024-04-03T14:06:22.5515874Z logger.go:146: 2024-04-03T14:04:31 .200Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5516442Z logger.go:146: 2024-04-03T14:04:31 .200Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5516973Z logger.go:146: 2024-04-03T14:04:31 .226Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5517502Z logger.go:146: 2024-04-03T14:04:31 .388Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5518027Z logger.go:146: 2024-04-03T14:04:31 .398Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5518551Z logger.go:146: 2024-04-03T14:04:31 .420Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5519082Z logger.go:146: 2024-04-03T14:04:31 .577Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5519606Z logger.go:146: 2024-04-03T14:04:31 .587Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5520133Z logger.go:146: 2024-04-03T14:04:31 .624Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5520850Z logger.go:146: 2024-04-03T14:04:31 .780Z INFO persisted to disk {"blocks": 0, "keys": 127, "headerHeight": 0, "blockHeight": 0, "took": "121.8µs"} 2024-04-03T14:06:22.5521398Z logger.go:146: 2024-04-03T14:04:31 .780Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5521925Z logger.go:146: 2024-04-03T14:04:31 .812Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5522452Z logger.go:146: 2024-04-03T14:04:31 .969Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5522975Z logger.go:146: 2024-04-03T14:04:32 .016Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5523507Z logger.go:146: 2024-04-03T14:04:32 .172Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5524033Z logger.go:146: 2024-04-03T14:04:32 .219Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5524555Z logger.go:146: 2024-04-03T14:04:32 .273Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5525081Z logger.go:146: 2024-04-03T14:04:32 .376Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5525604Z logger.go:146: 2024-04-03T14:04:32 .423Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5526131Z logger.go:146: 2024-04-03T14:04:32 .563Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5526691Z logger.go:146: 2024-04-03T14:04:32 .627Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5527254Z logger.go:146: 2024-04-03T14:04:32 .767Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5527777Z logger.go:146: 2024-04-03T14:04:32 .830Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5528304Z logger.go:146: 2024-04-03T14:04:32 .955Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5528824Z logger.go:146: 2024-04-03T14:04:33 .033Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5529352Z logger.go:146: 2024-04-03T14:04:33 .158Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5529871Z logger.go:146: 2024-04-03T14:04:33 .237Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5530395Z logger.go:146: 2024-04-03T14:04:33 .322Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5530921Z logger.go:146: 2024-04-03T14:04:33 .348Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5531472Z logger.go:146: 2024-04-03T14:04:33 .444Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5532031Z logger.go:146: 2024-04-03T14:04:33 .535Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5532555Z logger.go:146: 2024-04-03T14:04:33 .645Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5533084Z logger.go:146: 2024-04-03T14:04:33 .724Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5533604Z logger.go:146: 2024-04-03T14:04:33 .848Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5534132Z logger.go:146: 2024-04-03T14:04:33 .926Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5534662Z logger.go:146: 2024-04-03T14:04:34 .036Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5535190Z logger.go:146: 2024-04-03T14:04:34 .130Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5535712Z logger.go:146: 2024-04-03T14:04:34 .224Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5536231Z logger.go:146: 2024-04-03T14:04:34 .318Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5536758Z logger.go:146: 2024-04-03T14:04:34 .413Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5537277Z logger.go:146: 2024-04-03T14:04:34 .423Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5537805Z logger.go:146: 2024-04-03T14:04:34 .423Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5538329Z logger.go:146: 2024-04-03T14:04:34 .523Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5538855Z logger.go:146: 2024-04-03T14:04:34 .602Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5539382Z logger.go:146: 2024-04-03T14:04:34 .612Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5539908Z logger.go:146: 2024-04-03T14:04:34 .712Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5540431Z logger.go:146: 2024-04-03T14:04:34 .805Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5540956Z logger.go:146: 2024-04-03T14:04:34 .915Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5541515Z logger.go:146: 2024-04-03T14:04:34 .993Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5542070Z logger.go:146: 2024-04-03T14:04:35 .118Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5542599Z logger.go:146: 2024-04-03T14:04:35 .181Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5543125Z logger.go:146: 2024-04-03T14:04:35 .307Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5543650Z logger.go:146: 2024-04-03T14:04:35 .385Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5544169Z logger.go:146: 2024-04-03T14:04:35 .426Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5544695Z logger.go:146: 2024-04-03T14:04:35 .510Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5545262Z logger.go:146: 2024-04-03T14:04:35 .589Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5545839Z logger.go:146: 2024-04-03T14:04:35 .698Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5546421Z logger.go:146: 2024-04-03T14:04:35 .777Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5547045Z logger.go:146: 2024-04-03T14:04:35 .902Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5547593Z logger.go:146: 2024-04-03T14:04:35 .980Z DEBUG processing rpc request {"method": "getversion", "params": "[]"} 2024-04-03T14:06:22.5551421Z subscription_test.go:620: 2024-04-03T14:06:22.5551977Z Error Trace: D:/a/neo-go/neo-go/pkg/services/rpcsrv/subscription_test.go:620 2024-04-03T14:06:22.5552611Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/runtime/asm_amd64.s:1695 2024-04-03T14:06:22.5552821Z Error: Received unexpected error: 2024-04-03T14:06:22.5553328Z read tcp 127.0 .0.1:51969->127.0.0.1:51909: i/o timeout 2024-04-03T14:06:22.5553535Z Test: TestWSClientsLimit/default 2024-04-03T14:06:22.5553676Z subscription_test.go:620: 2024-04-03T14:06:22.5554223Z Error Trace: D:/a/neo-go/neo-go/pkg/services/rpcsrv/subscription_test.go:620 2024-04-03T14:06:22.5554831Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/runtime/asm_amd64.s:1695 2024-04-03T14:06:22.5555036Z Error: Received unexpected error: 2024-04-03T14:06:22.5555521Z read tcp 127.0 .0.1:51970->127.0.0.1:51909: i/o timeout 2024-04-03T14:06:22.5555723Z Test: TestWSClientsLimit/default 2024-04-03T14:06:22.5555860Z subscription_test.go:620: 2024-04-03T14:06:22.5556336Z Error Trace: D:/a/neo-go/neo-go/pkg/services/rpcsrv/subscription_test.go:620 2024-04-03T14:06:22.5556953Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/runtime/asm_amd64.s:1695 2024-04-03T14:06:22.5557160Z Error: Received unexpected error: 2024-04-03T14:06:22.5557636Z read tcp 127.0 .0.1:51973->127.0.0.1:51909: i/o timeout 2024-04-03T14:06:22.5557831Z Test: TestWSClientsLimit/default 2024-04-03T14:06:22.5557967Z subscription_test.go:620: 2024-04-03T14:06:22.5558437Z Error Trace: D:/a/neo-go/neo-go/pkg/services/rpcsrv/subscription_test.go:620 2024-04-03T14:06:22.5559028Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/runtime/asm_amd64.s:1695 2024-04-03T14:06:22.5559229Z Error: Received unexpected error: 2024-04-03T14:06:22.5559696Z read tcp 127.0 .0.1:51971->127.0.0.1:51909: i/o timeout 2024-04-03T14:06:22.5559891Z Test: TestWSClientsLimit/default 2024-04-03T14:06:22.5560024Z subscription_test.go:620: 2024-04-03T14:06:22.5560490Z Error Trace: D:/a/neo-go/neo-go/pkg/services/rpcsrv/subscription_test.go:620 2024-04-03T14:06:22.5561142Z C:/hostedtoolcache/windows/go/1.22.1/x64/src/runtime/asm_amd64.s:1695 2024-04-03T14:06:22.5561371Z Error: Received unexpected error: 2024-04-03T14:06:22.5561849Z read tcp 127.0 .0.1:51972->127.0.0.1:51909: i/o timeout 2024-04-03T14:06:22.5562043Z Test: TestWSClientsLimit/default 2024-04-03T14:06:22.5562179Z subscription_test.go:630: 2024-04-03T14:06:22.5562638Z Error Trace: D:/a/neo-go/neo-go/pkg/services/rpcsrv/subscription_test.go:630 2024-04-03T14:06:22.5562879Z Error: An error is expected but got nil. 2024-04-03T14:06:22.5563073Z Test: TestWSClientsLimit/default 2024-04-03T14:06:22.5563361Z Messages: The connection beyond the limit should fail 2024-04-03T14:06:22.5563886Z logger.go:146: 2024-04-03T14:04:36 .481Z INFO shutting down RPC server {"endpoint": "127.0.0 .1:51908"} ``` Close #3379 Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
2024-04-03 14:41:40 +00:00
dialer := websocket.Dialer{HandshakeTimeout: 10 * time.Second}
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
wss := make([]*websocket.Conn, effectiveClients)
var wg sync.WaitGroup
// Dial effectiveClients connections in parallel
for i := 0; i < effectiveClients; i++ {
wg.Add(1)
j := i
go func() {
defer wg.Done()
ws, r, err := dialer.Dial(url, nil)
if r != nil {
defer r.Body.Close()
}
require.NoError(t, err)
wss[j] = ws
doSomeWSRequest(t, ws)
}()
}
wg.Wait()
// Attempt one more connection, which should fail
_, r, err := dialer.Dial(url, nil)
require.Error(t, err, "The connection beyond the limit should fail")
if r != nil {
r.Body.Close()
}
// Check connections are still alive (it actually is necessary to add
// some use of wss to keep connections alive).
for _, ws := range wss {
doSomeWSRequest(t, ws)
ws.Close()
}
})
}
}
// The purpose of this test is to overflow buffers on server side to
// receive a 'missed' event. But it's actually hard to tell when exactly
// that's going to happen because of network-level buffering, typical
// number seen in tests is around ~3500 events, but it's not reliable enough,
// thus this test is disabled.
2021-05-12 15:20:48 +00:00
func TestSubscriptionOverflow(t *testing.T) {
if !testOverflow {
return
}
const blockCnt = notificationBufSize * 5
var receivedMiss bool
chain, _, c, respMsgs := initCleanServerAndWSClient(t)
resp := callWSGetRaw(t, c, `{"jsonrpc": "2.0","method": "subscribe","params": ["block_added"],"id": 1}`, respMsgs)
require.Nil(t, resp.Error)
require.NotNil(t, resp.Result)
// Push a lot of new blocks, but don't read events for them.
for i := 0; i < blockCnt; i++ {
b := testchain.NewBlock(t, chain, 1, 0)
require.NoError(t, chain.AddBlock(b))
}
for i := 0; i < blockCnt; i++ {
resp := getNotification(t, respMsgs)
if resp.Event != neorpc.BlockEventID {
require.Equal(t, neorpc.MissedEventID, resp.Event)
receivedMiss = true
break
}
}
require.Equal(t, true, receivedMiss)
// `Missed` is the last event and there is nothing afterwards.
require.Equal(t, 0, len(respMsgs))
}
func TestFilteredSubscriptions_InvalidFilter(t *testing.T) {
var cases = map[string]struct {
params string
}{
"notification with long name": {
params: `["notification_from_execution", {"name":"notification_from_execution_with_long_name"}]`,
},
"execution with invalid vm state": {
params: `["transaction_executed", {"state":"NOTHALT"}]`,
},
}
_, _, c, respMsgs := initCleanServerAndWSClient(t)
for name, this := range cases {
t.Run(name, func(t *testing.T) {
resp := callWSGetRaw(t, c, fmt.Sprintf(`{"jsonrpc": "2.0","method": "subscribe","params": %s,"id": 1}`, this.params), respMsgs)
require.NotNil(t, resp.Error)
require.Nil(t, resp.Result)
require.Contains(t, resp.Error.Error(), neorpc.ErrInvalidSubscriptionFilter.Error())
})
}
}