From d7c1f3eac7da319dd0be6d79fdd7ecff8ee59add Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Tue, 18 Oct 2022 15:20:55 +0300 Subject: [PATCH] rpc: add "container" filter to WS execution notifications --- docs/notifications.md | 5 ++-- pkg/neorpc/rpcevent/filter.go | 4 ++- pkg/neorpc/rpcevent/filter_test.go | 18 +++++++++-- pkg/neorpc/types.go | 3 +- pkg/rpcclient/actor/waiter.go | 4 +-- pkg/rpcclient/wsclient.go | 19 +++++++----- pkg/rpcclient/wsclient_test.go | 48 +++++++++++++++++++++++++----- pkg/services/rpcsrv/server.go | 2 +- 8 files changed, 77 insertions(+), 26 deletions(-) diff --git a/docs/notifications.md b/docs/notifications.md index 7c857fda8..a35867a02 100644 --- a/docs/notifications.md +++ b/docs/notifications.md @@ -19,7 +19,7 @@ Currently supported events: Contents: container hash, contract hash, notification name, stack item. Filters: contract hash, notification name. * transaction executed - Contents: application execution result. Filters: VM state. + Contents: application execution result. Filters: VM state, script container hash. * new/removed P2P notary request (if `P2PSigExtensions` are enabled) Contents: P2P notary request. Filters: request sender and main tx signer. @@ -69,7 +69,8 @@ Recognized stream names: notification name. * `transaction_executed` Filter: `state` field containing `HALT` or `FAULT` string for successful - and failed executions respectively. + and failed executions respectively and/or `container` field containing + script container hash. * `notary_request_event` Filter: `sender` field containing a string with hex-encoded Uint160 (LE representation) for notary request's `Sender` and/or `signer` in the same diff --git a/pkg/neorpc/rpcevent/filter.go b/pkg/neorpc/rpcevent/filter.go index 07f0dc9f7..65de9df59 100644 --- a/pkg/neorpc/rpcevent/filter.go +++ b/pkg/neorpc/rpcevent/filter.go @@ -64,7 +64,9 @@ func Matches(f Comparator, r Container) bool { case neorpc.ExecutionEventID: filt := filter.(neorpc.ExecutionFilter) applog := r.EventPayload().(*state.AppExecResult) - return applog.VMState.String() == filt.State + stateOK := filt.State == nil || applog.VMState.String() == *filt.State + containerOK := filt.Container == nil || applog.Container.Equals(*filt.Container) + return stateOK && containerOK case neorpc.NotaryRequestEventID: filt := filter.(neorpc.TxFilter) req := r.EventPayload().(*result.NotaryRequestEvent) diff --git a/pkg/neorpc/rpcevent/filter_test.go b/pkg/neorpc/rpcevent/filter_test.go index 71edeb7d1..9e36a7b54 100644 --- a/pkg/neorpc/rpcevent/filter_test.go +++ b/pkg/neorpc/rpcevent/filter_test.go @@ -47,6 +47,8 @@ func TestMatches(t *testing.T) { signer := util.Uint160{4, 5, 6} contract := util.Uint160{7, 8, 9} badUint160 := util.Uint160{9, 9, 9} + cnt := util.Uint256{1, 2, 3} + badUint256 := util.Uint256{9, 9, 9} name := "ntf name" badName := "bad name" bContainer := testContainer{ @@ -56,6 +58,7 @@ func TestMatches(t *testing.T) { }, } st := vmstate.Halt + goodState := st.String() badState := "FAULT" txContainer := testContainer{ id: neorpc.TransactionEventID, @@ -67,7 +70,7 @@ func TestMatches(t *testing.T) { } exContainer := testContainer{ id: neorpc.ExecutionEventID, - pld: &state.AppExecResult{Execution: state.Execution{VMState: st}}, + pld: &state.AppExecResult{Container: cnt, Execution: state.Execution{VMState: st}}, } ntrContainer := testContainer{ id: neorpc.NotaryRequestEventID, @@ -208,7 +211,16 @@ func TestMatches(t *testing.T) { name: "execution, state mismatch", comparator: testComparator{ id: neorpc.ExecutionEventID, - filter: neorpc.ExecutionFilter{State: badState}, + filter: neorpc.ExecutionFilter{State: &badState}, + }, + container: exContainer, + expected: false, + }, + { + name: "execution, container mismatch", + comparator: testComparator{ + id: neorpc.ExecutionEventID, + filter: neorpc.ExecutionFilter{Container: &badUint256}, }, container: exContainer, expected: false, @@ -217,7 +229,7 @@ func TestMatches(t *testing.T) { name: "execution, filter mismatch", comparator: testComparator{ id: neorpc.ExecutionEventID, - filter: neorpc.ExecutionFilter{State: st.String()}, + filter: neorpc.ExecutionFilter{State: &goodState, Container: &cnt}, }, container: exContainer, expected: true, diff --git a/pkg/neorpc/types.go b/pkg/neorpc/types.go index 70bab912c..f5cf2882f 100644 --- a/pkg/neorpc/types.go +++ b/pkg/neorpc/types.go @@ -95,7 +95,8 @@ type ( // events. It allows to choose failing or successful transactions based // on their VM state. ExecutionFilter struct { - State string `json:"state"` + State *string `json:"state,omitempty"` + Container *util.Uint256 `json:"container,omitempty"` } // SignerWithWitness represents transaction's signer with the corresponding witness. SignerWithWitness struct { diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go index b48ba0f3a..5623e16de 100644 --- a/pkg/rpcclient/actor/waiter.go +++ b/pkg/rpcclient/actor/waiter.go @@ -45,7 +45,7 @@ type ( RPCPollingWaiter SubscribeForNewBlocksWithChan(primary *int, since *uint32, rcvrCh chan<- rpcclient.Notification) (string, error) - SubscribeForTransactionExecutionsWithChan(state *string, rcvrCh chan<- rpcclient.Notification) (string, error) + SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- rpcclient.Notification) (string, error) Unsubscribe(id string) error } ) @@ -156,7 +156,7 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) ( waitErr = fmt.Errorf(errFmt, errArgs...) } }() - txsID, err := c.SubscribeForTransactionExecutionsWithChan(nil, rcvr) + txsID, err := c.SubscribeForTransactionExecutionsWithChan(nil, nil, rcvr) if err != nil { wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err) return diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index 9a6d4da90..8eafd55af 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -475,23 +475,26 @@ func (c *WSClient) SubscribeForExecutionNotificationsWithChan(contract *util.Uin // SubscribeForTransactionExecutions adds subscription for application execution // results generated during transaction execution to this instance of the client. It can // be filtered by state (HALT/FAULT) to check for successful or failing -// transactions, nil value means no filtering. -func (c *WSClient) SubscribeForTransactionExecutions(state *string) (string, error) { - return c.SubscribeForTransactionExecutionsWithChan(state, c.Notifications) +// transactions; it can also be filtered by script container hash. +// nil value means no filtering. +func (c *WSClient) SubscribeForTransactionExecutions(state *string, container *util.Uint256) (string, error) { + return c.SubscribeForTransactionExecutionsWithChan(state, container, c.Notifications) } // SubscribeForTransactionExecutionsWithChan registers provided channel as a // receiver for the specified execution notifications. The receiver channel must be // properly read and drained after usage in order not to block other notification // receivers. See SubscribeForTransactionExecutions for parameter details. -func (c *WSClient) SubscribeForTransactionExecutionsWithChan(state *string, rcvrCh chan<- Notification) (string, error) { +func (c *WSClient) SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- Notification) (string, error) { params := []interface{}{"transaction_executed"} var flt *neorpc.ExecutionFilter - if state != nil { - if *state != "HALT" && *state != "FAULT" { - return "", errors.New("bad state parameter") + if state != nil || container != nil { + if state != nil { + if *state != "HALT" && *state != "FAULT" { + return "", errors.New("bad state parameter") + } } - flt = &neorpc.ExecutionFilter{State: *state} + flt = &neorpc.ExecutionFilter{State: state, Container: container} params = append(params, *flt) } rcvr := notificationReceiver{ diff --git a/pkg/rpcclient/wsclient_test.go b/pkg/rpcclient/wsclient_test.go index 6f6ec9bdf..3788182e5 100644 --- a/pkg/rpcclient/wsclient_test.go +++ b/pkg/rpcclient/wsclient_test.go @@ -53,10 +53,10 @@ func TestWSClientSubscription(t *testing.T) { return wsc.SubscribeForExecutionNotificationsWithChan(nil, nil, ch) }, "executions": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForTransactionExecutions(nil) + return wsc.SubscribeForTransactionExecutions(nil, nil) }, "executions_with_custom_ch": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForTransactionExecutionsWithChan(nil, ch) + return wsc.SubscribeForTransactionExecutionsWithChan(nil, nil, ch) }, } t.Run("good", func(t *testing.T) { @@ -206,6 +206,8 @@ func TestWSClientEvents(t *testing.T) { ch1 := make(chan Notification) ch2 := make(chan Notification) ch3 := make(chan Notification) + halt := "HALT" + fault := "FAULT" wsc.subscriptionsLock.Lock() wsc.subscriptions["0"] = notificationReceiver{typ: neorpc.BlockEventID, ch: wsc.Notifications} wsc.subscriptions["1"] = notificationReceiver{typ: neorpc.ExecutionEventID, ch: wsc.Notifications} @@ -213,8 +215,8 @@ func TestWSClientEvents(t *testing.T) { wsc.subscriptions["3"] = notificationReceiver{typ: neorpc.BlockEventID, ch: ch1} wsc.subscriptions["4"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2} wsc.subscriptions["5"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2} // check duplicating subscriptions - wsc.subscriptions["6"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: "HALT"}, ch: ch2} - wsc.subscriptions["7"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: "FAULT"}, ch: ch3} + wsc.subscriptions["6"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: &halt}, ch: ch2} + wsc.subscriptions["7"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: &fault}, ch: ch3} // MissedEvent must be delivered without subscription. wsc.subscriptionsLock.Unlock() @@ -272,7 +274,7 @@ func TestWSExecutionVMStateCheck(t *testing.T) { wsc.getNextRequestID = getTestRequestID require.NoError(t, wsc.Init()) filter := "NONE" - _, err = wsc.SubscribeForTransactionExecutions(&filter) + _, err = wsc.SubscribeForTransactionExecutions(&filter, nil) require.Error(t, err) wsc.Close() } @@ -414,17 +416,47 @@ func TestWSFilteredSubscriptions(t *testing.T) { require.Equal(t, "my_pretty_notification", *filt.Name) }, }, - {"executions", + {"executions state", func(t *testing.T, wsc *WSClient) { state := "FAULT" - _, err := wsc.SubscribeForTransactionExecutions(&state) + _, err := wsc.SubscribeForTransactionExecutions(&state, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { param := p.Value(1) filt := new(neorpc.ExecutionFilter) require.NoError(t, json.Unmarshal(param.RawMessage, filt)) - require.Equal(t, "FAULT", filt.State) + require.Equal(t, "FAULT", *filt.State) + require.Equal(t, (*util.Uint256)(nil), filt.Container) + }, + }, + {"executions container", + func(t *testing.T, wsc *WSClient) { + container := util.Uint256{1, 2, 3} + _, err := wsc.SubscribeForTransactionExecutions(nil, &container) + require.NoError(t, err) + }, + func(t *testing.T, p *params.Params) { + param := p.Value(1) + filt := new(neorpc.ExecutionFilter) + require.NoError(t, json.Unmarshal(param.RawMessage, filt)) + require.Equal(t, (*string)(nil), filt.State) + require.Equal(t, util.Uint256{1, 2, 3}, *filt.Container) + }, + }, + {"executions state and container", + func(t *testing.T, wsc *WSClient) { + state := "FAULT" + container := util.Uint256{1, 2, 3} + _, err := wsc.SubscribeForTransactionExecutions(&state, &container) + require.NoError(t, err) + }, + func(t *testing.T, p *params.Params) { + param := p.Value(1) + filt := new(neorpc.ExecutionFilter) + require.NoError(t, json.Unmarshal(param.RawMessage, filt)) + require.Equal(t, "FAULT", *filt.State) + require.Equal(t, util.Uint256{1, 2, 3}, *filt.Container) }, }, } diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index d5874e92d..5f8d4af9e 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -2426,7 +2426,7 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (interface{ case neorpc.ExecutionEventID: flt := new(neorpc.ExecutionFilter) err = jd.Decode(flt) - if err == nil && (flt.State == "HALT" || flt.State == "FAULT") { + if err == nil && (flt.State != nil && (*flt.State == "HALT" || *flt.State == "FAULT")) { filter = *flt } else if err == nil { err = errors.New("invalid state")