diff --git a/pkg/rpc/client/wsclient.go b/pkg/rpc/client/wsclient.go index ff8dfce19..115fa2a64 100644 --- a/pkg/rpc/client/wsclient.go +++ b/pkg/rpc/client/wsclient.go @@ -12,6 +12,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/rpc/request" "github.com/nspcc-dev/neo-go/pkg/rpc/response" "github.com/nspcc-dev/neo-go/pkg/rpc/response/result" + "github.com/nspcc-dev/neo-go/pkg/util" ) // WSClient is a websocket-enabled RPC client that can be used with appropriate @@ -246,23 +247,40 @@ func (c *WSClient) SubscribeForNewBlocks() (string, error) { } // SubscribeForNewTransactions adds subscription for new transaction events to -// this instance of client. -func (c *WSClient) SubscribeForNewTransactions() (string, error) { +// this instance of client. It can be filtered by transaction type, nil value +// is treated as missing filter. +func (c *WSClient) SubscribeForNewTransactions(txType *transaction.TXType) (string, error) { params := request.NewRawParams("transaction_added") + if txType != nil { + params.Values = append(params.Values, request.TxFilter{Type: *txType}) + } return c.performSubscription(params) } // SubscribeForExecutionNotifications adds subscription for notifications -// generated during transaction execution to this instance of client. -func (c *WSClient) SubscribeForExecutionNotifications() (string, error) { +// generated during transaction execution to this instance of client. It can be +// filtered by contract's hash (that emits notifications), nil value puts no such +// restrictions. +func (c *WSClient) SubscribeForExecutionNotifications(contract *util.Uint160) (string, error) { params := request.NewRawParams("notification_from_execution") + if contract != nil { + params.Values = append(params.Values, request.NotificationFilter{Contract: *contract}) + } return c.performSubscription(params) } // SubscribeForTransactionExecutions adds subscription for application execution -// results generated during transaction execution to this instance of client. -func (c *WSClient) SubscribeForTransactionExecutions() (string, error) { +// results generated during transaction execution to this instance of client. Can +// be filtered by state (HALT/FAULT) to check for successful or failing +// transactions, nil value means no filtering. +func (c *WSClient) SubscribeForTransactionExecutions(state *string) (string, error) { params := request.NewRawParams("transaction_executed") + if state != nil { + if *state != "HALT" && *state != "FAULT" { + return "", errors.New("bad state parameter") + } + params.Values = append(params.Values, request.ExecutionFilter{State: *state}) + } return c.performSubscription(params) } diff --git a/pkg/rpc/client/wsclient_test.go b/pkg/rpc/client/wsclient_test.go index 1fd7d68ca..63643bdfb 100644 --- a/pkg/rpc/client/wsclient_test.go +++ b/pkg/rpc/client/wsclient_test.go @@ -8,6 +8,9 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/rpc/request" + "github.com/nspcc-dev/neo-go/pkg/util" "github.com/stretchr/testify/require" ) @@ -21,10 +24,16 @@ func TestWSClientClose(t *testing.T) { func TestWSClientSubscription(t *testing.T) { var cases = map[string]func(*WSClient) (string, error){ - "blocks": (*WSClient).SubscribeForNewBlocks, - "transactions": (*WSClient).SubscribeForNewTransactions, - "notifications": (*WSClient).SubscribeForExecutionNotifications, - "executions": (*WSClient).SubscribeForTransactionExecutions, + "blocks": (*WSClient).SubscribeForNewBlocks, + "transactions": func(wsc *WSClient) (string, error) { + return wsc.SubscribeForNewTransactions(nil) + }, + "notifications": func(wsc *WSClient) (string, error) { + return wsc.SubscribeForExecutionNotifications(nil) + }, + "executions": func(wsc *WSClient) (string, error) { + return wsc.SubscribeForTransactionExecutions(nil) + }, } t.Run("good", func(t *testing.T) { for name, f := range cases { @@ -145,3 +154,96 @@ func TestWSClientEvents(t *testing.T) { // Connection closed by server. require.Equal(t, false, ok) } + +func TestWSExecutionVMStateCheck(t *testing.T) { + // Will answer successfully if request slips through. + srv := initTestServer(t, `{"jsonrpc": "2.0", "id": 1, "result": "55aaff00"}`) + defer srv.Close() + wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{}) + require.NoError(t, err) + filter := "NONE" + _, err = wsc.SubscribeForTransactionExecutions(&filter) + require.Error(t, err) + wsc.Close() +} + +func TestWSFilteredSubscriptions(t *testing.T) { + var cases = []struct { + name string + clientCode func(*testing.T, *WSClient) + serverCode func(*testing.T, *request.Params) + }{ + {"transactions", + func(t *testing.T, wsc *WSClient) { + tt := transaction.InvocationType + _, err := wsc.SubscribeForNewTransactions(&tt) + require.NoError(t, err) + }, + func(t *testing.T, p *request.Params) { + param, ok := p.Value(1) + require.Equal(t, true, ok) + require.Equal(t, request.TxFilterT, param.Type) + filt, ok := param.Value.(request.TxFilter) + require.Equal(t, true, ok) + require.Equal(t, transaction.InvocationType, filt.Type) + }, + }, + {"notifications", + func(t *testing.T, wsc *WSClient) { + contract := util.Uint160{1, 2, 3, 4, 5} + _, err := wsc.SubscribeForExecutionNotifications(&contract) + require.NoError(t, err) + }, + func(t *testing.T, p *request.Params) { + param, ok := p.Value(1) + require.Equal(t, true, ok) + require.Equal(t, request.NotificationFilterT, param.Type) + filt, ok := param.Value.(request.NotificationFilter) + require.Equal(t, true, ok) + require.Equal(t, util.Uint160{1, 2, 3, 4, 5}, filt.Contract) + }, + }, + {"executions", + func(t *testing.T, wsc *WSClient) { + state := "FAULT" + _, err := wsc.SubscribeForTransactionExecutions(&state) + require.NoError(t, err) + }, + func(t *testing.T, p *request.Params) { + param, ok := p.Value(1) + require.Equal(t, true, ok) + require.Equal(t, request.ExecutionFilterT, param.Type) + filt, ok := param.Value.(request.ExecutionFilter) + require.Equal(t, true, ok) + require.Equal(t, "FAULT", filt.State) + }, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if req.URL.Path == "/ws" && req.Method == "GET" { + var upgrader = websocket.Upgrader{} + ws, err := upgrader.Upgrade(w, req, nil) + require.NoError(t, err) + ws.SetReadDeadline(time.Now().Add(2 * time.Second)) + req := request.In{} + err = ws.ReadJSON(&req) + require.NoError(t, err) + params, err := req.Params() + require.NoError(t, err) + c.serverCode(t, params) + ws.SetWriteDeadline(time.Now().Add(2 * time.Second)) + err = ws.WriteMessage(1, []byte(`{"jsonrpc": "2.0", "id": 1, "result": "0"}`)) + require.NoError(t, err) + ws.Close() + } + })) + defer srv.Close() + wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{}) + require.NoError(t, err) + c.clientCode(t, wsc) + wsc.Close() + }) + } +} diff --git a/pkg/rpc/request/param.go b/pkg/rpc/request/param.go index 55052990e..ef3bdd155 100644 --- a/pkg/rpc/request/param.go +++ b/pkg/rpc/request/param.go @@ -7,6 +7,7 @@ import ( "fmt" "strconv" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/smartcontract" "github.com/nspcc-dev/neo-go/pkg/util" @@ -29,6 +30,23 @@ type ( Type smartcontract.ParamType `json:"type"` Value Param `json:"value"` } + // TxFilter is a wrapper structure for transaction event filter. The only + // allowed filter is a transaction type for now. + TxFilter struct { + Type transaction.TXType `json:"type"` + } + // NotificationFilter is a wrapper structure representing filter used for + // notifications generated during transaction execution. Notifications can + // only be filtered by contract hash. + NotificationFilter struct { + Contract util.Uint160 `json:"contract"` + } + // ExecutionFilter is a wrapper structure used for transaction execution + // events. It allows to choose failing or successful transactions based + // on their VM state. + ExecutionFilter struct { + State string `json:"state"` + } ) // These are parameter types accepted by RPC server. @@ -38,6 +56,9 @@ const ( NumberT ArrayT FuncParamT + TxFilterT + NotificationFilterT + ExecutionFilterT ) func (p Param) String() string { @@ -130,38 +151,43 @@ func (p Param) GetBytesHex() ([]byte, error) { // UnmarshalJSON implements json.Unmarshaler interface. func (p *Param) UnmarshalJSON(data []byte) error { var s string - if err := json.Unmarshal(data, &s); err == nil { - p.Type = StringT - p.Value = s - - return nil - } - var num float64 - if err := json.Unmarshal(data, &num); err == nil { - p.Type = NumberT - p.Value = int(num) - - return nil + // To unmarshal correctly we need to pass pointers into the decoder. + var attempts = [...]Param{ + {NumberT, &num}, + {StringT, &s}, + {FuncParamT, &FuncParam{}}, + {TxFilterT, &TxFilter{}}, + {NotificationFilterT, &NotificationFilter{}}, + {ExecutionFilterT, &ExecutionFilter{}}, + {ArrayT, &[]Param{}}, } - r := bytes.NewReader(data) - jd := json.NewDecoder(r) - jd.DisallowUnknownFields() - var fp FuncParam - if err := jd.Decode(&fp); err == nil { - p.Type = FuncParamT - p.Value = fp - - return nil - } - - var ps []Param - if err := json.Unmarshal(data, &ps); err == nil { - p.Type = ArrayT - p.Value = ps - - return nil + for _, cur := range attempts { + r := bytes.NewReader(data) + jd := json.NewDecoder(r) + jd.DisallowUnknownFields() + if err := jd.Decode(cur.Value); err == nil { + p.Type = cur.Type + // But we need to store actual values, not pointers. + switch val := cur.Value.(type) { + case *float64: + p.Value = int(*val) + case *string: + p.Value = *val + case *FuncParam: + p.Value = *val + case *TxFilter: + p.Value = *val + case *NotificationFilter: + p.Value = *val + case *ExecutionFilter: + p.Value = *val + case *[]Param: + p.Value = *val + } + return nil + } } return errors.New("unknown type") diff --git a/pkg/rpc/request/param_test.go b/pkg/rpc/request/param_test.go index 26d29bab1..da04ea540 100644 --- a/pkg/rpc/request/param_test.go +++ b/pkg/rpc/request/param_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "testing" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/smartcontract" "github.com/nspcc-dev/neo-go/pkg/util" @@ -13,7 +14,12 @@ import ( ) func TestParam_UnmarshalJSON(t *testing.T) { - msg := `["str1", 123, ["str2", 3], [{"type": "String", "value": "jajaja"}]]` + msg := `["str1", 123, ["str2", 3], [{"type": "String", "value": "jajaja"}], + {"type": "MinerTransaction"}, + {"contract": "f84d6a337fbc3d3a201d41da99e86b479e7a2554"}, + {"state": "HALT"}]` + contr, err := util.Uint160DecodeStringLE("f84d6a337fbc3d3a201d41da99e86b479e7a2554") + require.NoError(t, err) expected := Params{ { Type: StringT, @@ -51,6 +57,18 @@ func TestParam_UnmarshalJSON(t *testing.T) { }, }, }, + { + Type: TxFilterT, + Value: TxFilter{Type: transaction.MinerType}, + }, + { + Type: NotificationFilterT, + Value: NotificationFilter{Contract: contr}, + }, + { + Type: ExecutionFilterT, + Value: ExecutionFilter{State: "HALT"}, + }, } var ps Params