neo-go/pkg/rpc/server/subscription_test.go

325 lines
11 KiB
Go
Raw Normal View History

package server
import (
"encoding/json"
"fmt"
"strings"
"testing"
"time"
"github.com/gorilla/websocket"
"github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/rpc/response"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)
func wsReader(t *testing.T, ws *websocket.Conn, msgCh chan<- []byte, isFinished *atomic.Bool) {
for {
ws.SetReadDeadline(time.Now().Add(time.Second))
_, body, err := ws.ReadMessage()
if isFinished.Load() {
require.Error(t, err)
break
}
require.NoError(t, err)
msgCh <- body
}
}
func callWSGetRaw(t *testing.T, ws *websocket.Conn, msg string, respCh <-chan []byte) *response.Raw {
var resp = new(response.Raw)
ws.SetWriteDeadline(time.Now().Add(time.Second))
require.NoError(t, ws.WriteMessage(websocket.TextMessage, []byte(msg)))
body := <-respCh
require.NoError(t, json.Unmarshal(body, resp))
return resp
}
func getNotification(t *testing.T, respCh <-chan []byte) *response.Notification {
var resp = new(response.Notification)
body := <-respCh
require.NoError(t, json.Unmarshal(body, resp))
return resp
}
func initCleanServerAndWSClient(t *testing.T) (*core.Blockchain, *Server, *websocket.Conn, chan []byte, *atomic.Bool) {
chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t)
dialer := websocket.Dialer{HandshakeTimeout: time.Second}
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
ws, _, err := dialer.Dial(url, nil)
require.NoError(t, err)
// Use buffered channel to read server's messages and then read expected
// responses from it.
respMsgs := make(chan []byte, 16)
finishedFlag := atomic.NewBool(false)
go wsReader(t, ws, respMsgs, finishedFlag)
return chain, rpcSrv, ws, respMsgs, finishedFlag
}
func callSubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, params string) string {
var s string
resp := callWSGetRaw(t, ws, fmt.Sprintf(`{"jsonrpc": "2.0","method": "subscribe","params": %s,"id": 1}`, params), msgs)
require.Nil(t, resp.Error)
require.NotNil(t, resp.Result)
require.NoError(t, json.Unmarshal(resp.Result, &s))
return s
}
func callUnsubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, id string) {
var b bool
resp := callWSGetRaw(t, ws, fmt.Sprintf(`{"jsonrpc": "2.0","method": "unsubscribe","params": ["%s"],"id": 1}`, id), msgs)
require.Nil(t, resp.Error)
require.NotNil(t, resp.Result)
require.NoError(t, json.Unmarshal(resp.Result, &b))
require.Equal(t, true, b)
}
func TestSubscriptions(t *testing.T) {
var subIDs = make([]string, 0)
var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed"}
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
defer chain.Close()
defer rpcSrv.Shutdown()
for _, feed := range subFeeds {
s := callSubscribe(t, c, respMsgs, fmt.Sprintf(`["%s"]`, feed))
subIDs = append(subIDs, s)
}
for _, b := range getTestBlocks(t) {
require.NoError(t, chain.AddBlock(b))
for _, tx := range b.Transactions {
var mayNotify bool
if tx.Type == transaction.InvocationType {
resp := getNotification(t, respMsgs)
require.Equal(t, response.ExecutionEventID, resp.Event)
mayNotify = true
}
for {
resp := getNotification(t, respMsgs)
if mayNotify && resp.Event == response.NotificationEventID {
continue
}
require.Equal(t, response.TransactionEventID, resp.Event)
break
}
}
resp := getNotification(t, respMsgs)
require.Equal(t, response.BlockEventID, resp.Event)
}
for _, id := range subIDs {
callUnsubscribe(t, c, respMsgs, id)
}
finishedFlag.CAS(false, true)
c.Close()
}
func TestFilteredSubscriptions(t *testing.T) {
var cases = map[string]struct {
params string
check func(*testing.T, *response.Notification)
}{
"tx matching": {
params: `["transaction_added", {"type":"InvocationTransaction"}]`,
check: func(t *testing.T, resp *response.Notification) {
rmap := resp.Payload[0].(map[string]interface{})
require.Equal(t, response.TransactionEventID, resp.Event)
typ := rmap["type"].(string)
require.Equal(t, "InvocationTransaction", typ)
},
},
"notification matching": {
params: `["notification_from_execution", {"contract":"` + testContractHash + `"}]`,
check: func(t *testing.T, resp *response.Notification) {
rmap := resp.Payload[0].(map[string]interface{})
require.Equal(t, response.NotificationEventID, resp.Event)
c := rmap["contract"].(string)
require.Equal(t, "0x"+testContractHash, c)
},
},
"execution matching": {
params: `["transaction_executed", {"state":"HALT"}]`,
check: func(t *testing.T, resp *response.Notification) {
rmap := resp.Payload[0].(map[string]interface{})
require.Equal(t, response.ExecutionEventID, resp.Event)
execs := rmap["executions"].([]interface{})
exec0 := execs[0].(map[string]interface{})
st := exec0["vmstate"].(string)
require.Equal(t, "HALT", st)
},
},
"tx non-matching": {
params: `["transaction_added", {"type":"EnrollmentTransaction"}]`,
check: func(t *testing.T, _ *response.Notification) {
t.Fatal("unexpected match for EnrollmentTransaction")
},
},
"notification non-matching": {
params: `["notification_from_execution", {"contract":"00112233445566778899aabbccddeeff00112233"}]`,
check: func(t *testing.T, _ *response.Notification) {
t.Fatal("unexpected match for contract 00112233445566778899aabbccddeeff00112233")
},
},
"execution non-matching": {
params: `["transaction_executed", {"state":"FAULT"}]`,
check: func(t *testing.T, _ *response.Notification) {
t.Fatal("unexpected match for faulted execution")
},
},
}
for name, this := range cases {
t.Run(name, func(t *testing.T) {
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
defer chain.Close()
defer rpcSrv.Shutdown()
// It's used as an end-of-event-stream, so it's always present.
blockSubID := callSubscribe(t, c, respMsgs, `["block_added"]`)
subID := callSubscribe(t, c, respMsgs, this.params)
var lastBlock uint32
for _, b := range getTestBlocks(t) {
require.NoError(t, chain.AddBlock(b))
lastBlock = b.Index
}
for {
resp := getNotification(t, respMsgs)
rmap := resp.Payload[0].(map[string]interface{})
if resp.Event == response.BlockEventID {
index := rmap["index"].(float64)
if uint32(index) == lastBlock {
break
}
continue
}
this.check(t, resp)
}
callUnsubscribe(t, c, respMsgs, subID)
callUnsubscribe(t, c, respMsgs, blockSubID)
finishedFlag.CAS(false, true)
c.Close()
})
}
}
func TestMaxSubscriptions(t *testing.T) {
var subIDs = make([]string, 0)
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
defer chain.Close()
defer rpcSrv.Shutdown()
for i := 0; i < maxFeeds+1; i++ {
var s string
resp := callWSGetRaw(t, c, `{"jsonrpc": "2.0", "method": "subscribe", "params": ["block_added"], "id": 1}`, respMsgs)
if i < maxFeeds {
require.Nil(t, resp.Error)
require.NotNil(t, resp.Result)
require.NoError(t, json.Unmarshal(resp.Result, &s))
// Each ID must be unique.
for _, id := range subIDs {
require.NotEqual(t, id, s)
}
subIDs = append(subIDs, s)
} else {
require.NotNil(t, resp.Error)
require.Nil(t, resp.Result)
}
}
finishedFlag.CAS(false, true)
c.Close()
}
func TestBadSubUnsub(t *testing.T) {
var subCases = map[string]string{
"no params": `{"jsonrpc": "2.0", "method": "subscribe", "params": [], "id": 1}`,
"bad (non-string) event": `{"jsonrpc": "2.0", "method": "subscribe", "params": [1], "id": 1}`,
"bad (wrong) event": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["block_removed"], "id": 1}`,
"missed event": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["event_missed"], "id": 1}`,
"block invalid filter": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["block_added", 1], "id": 1}`,
"tx filter 1": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["transaction_added", 1], "id": 1}`,
"tx filter 2": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["transaction_added", {"state": "HALT"}], "id": 1}`,
"notification filter": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["notification_from_execution", "contract"], "id": 1}`,
"execution filter 1": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["transaction_executed", "FAULT"], "id": 1}`,
"execution filter 2": `{"jsonrpc": "2.0", "method": "subscribe", "params": ["transaction_executed", {"state": "STOP"}], "id": 1}`,
}
var unsubCases = map[string]string{
"no params": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": [], "id": 1}`,
"bad id": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": ["vasiliy"], "id": 1}`,
"not subscribed id": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": ["7"], "id": 1}`,
}
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
defer chain.Close()
defer rpcSrv.Shutdown()
testF := func(t *testing.T, cases map[string]string) func(t *testing.T) {
return func(t *testing.T) {
for n, s := range cases {
t.Run(n, func(t *testing.T) {
resp := callWSGetRaw(t, c, s, respMsgs)
require.NotNil(t, resp.Error)
require.Nil(t, resp.Result)
})
}
}
}
t.Run("subscribe", testF(t, subCases))
t.Run("unsubscribe", testF(t, unsubCases))
finishedFlag.CAS(false, true)
c.Close()
}
func doSomeWSRequest(t *testing.T, ws *websocket.Conn) {
ws.SetWriteDeadline(time.Now().Add(time.Second))
// It could be just about anything including invalid request,
// we only care about server handling being active.
require.NoError(t, ws.WriteMessage(websocket.TextMessage, []byte(`{"jsonrpc": "2.0", "method": "getversion", "params": [], "id": 1}`)))
ws.SetReadDeadline(time.Now().Add(time.Second))
_, _, err := ws.ReadMessage()
require.NoError(t, err)
}
func TestWSClientsLimit(t *testing.T) {
chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t)
defer chain.Close()
defer rpcSrv.Shutdown()
dialer := websocket.Dialer{HandshakeTimeout: time.Second}
url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws"
wss := make([]*websocket.Conn, maxSubscribers)
for i := 0; i < len(wss)+1; i++ {
ws, _, err := dialer.Dial(url, nil)
if i < maxSubscribers {
require.NoError(t, err)
wss[i] = ws
// Check that it's completely ready.
doSomeWSRequest(t, ws)
} else {
require.Error(t, err)
}
}
// Check connections are still alive (it actually is necessary to add
// some use of wss to keep connections alive).
for i := 0; i < len(wss); i++ {
doSomeWSRequest(t, wss[i])
}
}