rpc/client: add support for notification filters

Differing a bit from #895 draft specification, we won't add `verifier` (or
signer) for Neo 2, it's not worth doing so at the moment.
This commit is contained in:
Roman Khimov 2020-05-13 13:16:42 +03:00
parent da32cff313
commit 78716c5335
4 changed files with 204 additions and 40 deletions

View file

@ -12,6 +12,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/rpc/request" "github.com/nspcc-dev/neo-go/pkg/rpc/request"
"github.com/nspcc-dev/neo-go/pkg/rpc/response" "github.com/nspcc-dev/neo-go/pkg/rpc/response"
"github.com/nspcc-dev/neo-go/pkg/rpc/response/result" "github.com/nspcc-dev/neo-go/pkg/rpc/response/result"
"github.com/nspcc-dev/neo-go/pkg/util"
) )
// WSClient is a websocket-enabled RPC client that can be used with appropriate // WSClient is a websocket-enabled RPC client that can be used with appropriate
@ -246,23 +247,40 @@ func (c *WSClient) SubscribeForNewBlocks() (string, error) {
} }
// SubscribeForNewTransactions adds subscription for new transaction events to // SubscribeForNewTransactions adds subscription for new transaction events to
// this instance of client. // this instance of client. It can be filtered by transaction type, nil value
func (c *WSClient) SubscribeForNewTransactions() (string, error) { // is treated as missing filter.
func (c *WSClient) SubscribeForNewTransactions(txType *transaction.TXType) (string, error) {
params := request.NewRawParams("transaction_added") params := request.NewRawParams("transaction_added")
if txType != nil {
params.Values = append(params.Values, request.TxFilter{Type: *txType})
}
return c.performSubscription(params) return c.performSubscription(params)
} }
// SubscribeForExecutionNotifications adds subscription for notifications // SubscribeForExecutionNotifications adds subscription for notifications
// generated during transaction execution to this instance of client. // generated during transaction execution to this instance of client. It can be
func (c *WSClient) SubscribeForExecutionNotifications() (string, error) { // filtered by contract's hash (that emits notifications), nil value puts no such
// restrictions.
func (c *WSClient) SubscribeForExecutionNotifications(contract *util.Uint160) (string, error) {
params := request.NewRawParams("notification_from_execution") params := request.NewRawParams("notification_from_execution")
if contract != nil {
params.Values = append(params.Values, request.NotificationFilter{Contract: *contract})
}
return c.performSubscription(params) return c.performSubscription(params)
} }
// SubscribeForTransactionExecutions adds subscription for application execution // SubscribeForTransactionExecutions adds subscription for application execution
// results generated during transaction execution to this instance of client. // results generated during transaction execution to this instance of client. Can
func (c *WSClient) SubscribeForTransactionExecutions() (string, error) { // be filtered by state (HALT/FAULT) to check for successful or failing
// transactions, nil value means no filtering.
func (c *WSClient) SubscribeForTransactionExecutions(state *string) (string, error) {
params := request.NewRawParams("transaction_executed") params := request.NewRawParams("transaction_executed")
if state != nil {
if *state != "HALT" && *state != "FAULT" {
return "", errors.New("bad state parameter")
}
params.Values = append(params.Values, request.ExecutionFilter{State: *state})
}
return c.performSubscription(params) return c.performSubscription(params)
} }

View file

@ -8,6 +8,9 @@ import (
"time" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/rpc/request"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -21,10 +24,16 @@ func TestWSClientClose(t *testing.T) {
func TestWSClientSubscription(t *testing.T) { func TestWSClientSubscription(t *testing.T) {
var cases = map[string]func(*WSClient) (string, error){ var cases = map[string]func(*WSClient) (string, error){
"blocks": (*WSClient).SubscribeForNewBlocks, "blocks": (*WSClient).SubscribeForNewBlocks,
"transactions": (*WSClient).SubscribeForNewTransactions, "transactions": func(wsc *WSClient) (string, error) {
"notifications": (*WSClient).SubscribeForExecutionNotifications, return wsc.SubscribeForNewTransactions(nil)
"executions": (*WSClient).SubscribeForTransactionExecutions, },
"notifications": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForExecutionNotifications(nil)
},
"executions": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForTransactionExecutions(nil)
},
} }
t.Run("good", func(t *testing.T) { t.Run("good", func(t *testing.T) {
for name, f := range cases { for name, f := range cases {
@ -145,3 +154,96 @@ func TestWSClientEvents(t *testing.T) {
// Connection closed by server. // Connection closed by server.
require.Equal(t, false, ok) require.Equal(t, false, ok)
} }
func TestWSExecutionVMStateCheck(t *testing.T) {
// Will answer successfully if request slips through.
srv := initTestServer(t, `{"jsonrpc": "2.0", "id": 1, "result": "55aaff00"}`)
defer srv.Close()
wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{})
require.NoError(t, err)
filter := "NONE"
_, err = wsc.SubscribeForTransactionExecutions(&filter)
require.Error(t, err)
wsc.Close()
}
func TestWSFilteredSubscriptions(t *testing.T) {
var cases = []struct {
name string
clientCode func(*testing.T, *WSClient)
serverCode func(*testing.T, *request.Params)
}{
{"transactions",
func(t *testing.T, wsc *WSClient) {
tt := transaction.InvocationType
_, err := wsc.SubscribeForNewTransactions(&tt)
require.NoError(t, err)
},
func(t *testing.T, p *request.Params) {
param, ok := p.Value(1)
require.Equal(t, true, ok)
require.Equal(t, request.TxFilterT, param.Type)
filt, ok := param.Value.(request.TxFilter)
require.Equal(t, true, ok)
require.Equal(t, transaction.InvocationType, filt.Type)
},
},
{"notifications",
func(t *testing.T, wsc *WSClient) {
contract := util.Uint160{1, 2, 3, 4, 5}
_, err := wsc.SubscribeForExecutionNotifications(&contract)
require.NoError(t, err)
},
func(t *testing.T, p *request.Params) {
param, ok := p.Value(1)
require.Equal(t, true, ok)
require.Equal(t, request.NotificationFilterT, param.Type)
filt, ok := param.Value.(request.NotificationFilter)
require.Equal(t, true, ok)
require.Equal(t, util.Uint160{1, 2, 3, 4, 5}, filt.Contract)
},
},
{"executions",
func(t *testing.T, wsc *WSClient) {
state := "FAULT"
_, err := wsc.SubscribeForTransactionExecutions(&state)
require.NoError(t, err)
},
func(t *testing.T, p *request.Params) {
param, ok := p.Value(1)
require.Equal(t, true, ok)
require.Equal(t, request.ExecutionFilterT, param.Type)
filt, ok := param.Value.(request.ExecutionFilter)
require.Equal(t, true, ok)
require.Equal(t, "FAULT", filt.State)
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/ws" && req.Method == "GET" {
var upgrader = websocket.Upgrader{}
ws, err := upgrader.Upgrade(w, req, nil)
require.NoError(t, err)
ws.SetReadDeadline(time.Now().Add(2 * time.Second))
req := request.In{}
err = ws.ReadJSON(&req)
require.NoError(t, err)
params, err := req.Params()
require.NoError(t, err)
c.serverCode(t, params)
ws.SetWriteDeadline(time.Now().Add(2 * time.Second))
err = ws.WriteMessage(1, []byte(`{"jsonrpc": "2.0", "id": 1, "result": "0"}`))
require.NoError(t, err)
ws.Close()
}
}))
defer srv.Close()
wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{})
require.NoError(t, err)
c.clientCode(t, wsc)
wsc.Close()
})
}
}

View file

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"strconv" "strconv"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/smartcontract" "github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
@ -29,6 +30,23 @@ type (
Type smartcontract.ParamType `json:"type"` Type smartcontract.ParamType `json:"type"`
Value Param `json:"value"` Value Param `json:"value"`
} }
// TxFilter is a wrapper structure for transaction event filter. The only
// allowed filter is a transaction type for now.
TxFilter struct {
Type transaction.TXType `json:"type"`
}
// NotificationFilter is a wrapper structure representing filter used for
// notifications generated during transaction execution. Notifications can
// only be filtered by contract hash.
NotificationFilter struct {
Contract util.Uint160 `json:"contract"`
}
// ExecutionFilter is a wrapper structure used for transaction execution
// events. It allows to choose failing or successful transactions based
// on their VM state.
ExecutionFilter struct {
State string `json:"state"`
}
) )
// These are parameter types accepted by RPC server. // These are parameter types accepted by RPC server.
@ -38,6 +56,9 @@ const (
NumberT NumberT
ArrayT ArrayT
FuncParamT FuncParamT
TxFilterT
NotificationFilterT
ExecutionFilterT
) )
func (p Param) String() string { func (p Param) String() string {
@ -130,38 +151,43 @@ func (p Param) GetBytesHex() ([]byte, error) {
// UnmarshalJSON implements json.Unmarshaler interface. // UnmarshalJSON implements json.Unmarshaler interface.
func (p *Param) UnmarshalJSON(data []byte) error { func (p *Param) UnmarshalJSON(data []byte) error {
var s string var s string
if err := json.Unmarshal(data, &s); err == nil {
p.Type = StringT
p.Value = s
return nil
}
var num float64 var num float64
if err := json.Unmarshal(data, &num); err == nil { // To unmarshal correctly we need to pass pointers into the decoder.
p.Type = NumberT var attempts = [...]Param{
p.Value = int(num) {NumberT, &num},
{StringT, &s},
return nil {FuncParamT, &FuncParam{}},
{TxFilterT, &TxFilter{}},
{NotificationFilterT, &NotificationFilter{}},
{ExecutionFilterT, &ExecutionFilter{}},
{ArrayT, &[]Param{}},
} }
r := bytes.NewReader(data) for _, cur := range attempts {
jd := json.NewDecoder(r) r := bytes.NewReader(data)
jd.DisallowUnknownFields() jd := json.NewDecoder(r)
var fp FuncParam jd.DisallowUnknownFields()
if err := jd.Decode(&fp); err == nil { if err := jd.Decode(cur.Value); err == nil {
p.Type = FuncParamT p.Type = cur.Type
p.Value = fp // But we need to store actual values, not pointers.
switch val := cur.Value.(type) {
return nil case *float64:
} p.Value = int(*val)
case *string:
var ps []Param p.Value = *val
if err := json.Unmarshal(data, &ps); err == nil { case *FuncParam:
p.Type = ArrayT p.Value = *val
p.Value = ps case *TxFilter:
p.Value = *val
return nil case *NotificationFilter:
p.Value = *val
case *ExecutionFilter:
p.Value = *val
case *[]Param:
p.Value = *val
}
return nil
}
} }
return errors.New("unknown type") return errors.New("unknown type")

View file

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"testing" "testing"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/smartcontract" "github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
@ -13,7 +14,12 @@ import (
) )
func TestParam_UnmarshalJSON(t *testing.T) { func TestParam_UnmarshalJSON(t *testing.T) {
msg := `["str1", 123, ["str2", 3], [{"type": "String", "value": "jajaja"}]]` msg := `["str1", 123, ["str2", 3], [{"type": "String", "value": "jajaja"}],
{"type": "MinerTransaction"},
{"contract": "f84d6a337fbc3d3a201d41da99e86b479e7a2554"},
{"state": "HALT"}]`
contr, err := util.Uint160DecodeStringLE("f84d6a337fbc3d3a201d41da99e86b479e7a2554")
require.NoError(t, err)
expected := Params{ expected := Params{
{ {
Type: StringT, Type: StringT,
@ -51,6 +57,18 @@ func TestParam_UnmarshalJSON(t *testing.T) {
}, },
}, },
}, },
{
Type: TxFilterT,
Value: TxFilter{Type: transaction.MinerType},
},
{
Type: NotificationFilterT,
Value: NotificationFilter{Contract: contr},
},
{
Type: ExecutionFilterT,
Value: ExecutionFilter{State: "HALT"},
},
} }
var ps Params var ps Params