rpc: add "container" filter to WS execution notifications

This commit is contained in:
Anna Shaleva 2022-10-18 15:20:55 +03:00
parent 71069b0ed0
commit d7c1f3eac7
8 changed files with 77 additions and 26 deletions

View file

@ -19,7 +19,7 @@ Currently supported events:
Contents: container hash, contract hash, notification name, stack item. Filters: contract hash, notification name. Contents: container hash, contract hash, notification name, stack item. Filters: contract hash, notification name.
* transaction executed * transaction executed
Contents: application execution result. Filters: VM state. Contents: application execution result. Filters: VM state, script container hash.
* new/removed P2P notary request (if `P2PSigExtensions` are enabled) * new/removed P2P notary request (if `P2PSigExtensions` are enabled)
Contents: P2P notary request. Filters: request sender and main tx signer. Contents: P2P notary request. Filters: request sender and main tx signer.
@ -69,7 +69,8 @@ Recognized stream names:
notification name. notification name.
* `transaction_executed` * `transaction_executed`
Filter: `state` field containing `HALT` or `FAULT` string for successful Filter: `state` field containing `HALT` or `FAULT` string for successful
and failed executions respectively. and failed executions respectively and/or `container` field containing
script container hash.
* `notary_request_event` * `notary_request_event`
Filter: `sender` field containing a string with hex-encoded Uint160 (LE Filter: `sender` field containing a string with hex-encoded Uint160 (LE
representation) for notary request's `Sender` and/or `signer` in the same representation) for notary request's `Sender` and/or `signer` in the same

View file

@ -64,7 +64,9 @@ func Matches(f Comparator, r Container) bool {
case neorpc.ExecutionEventID: case neorpc.ExecutionEventID:
filt := filter.(neorpc.ExecutionFilter) filt := filter.(neorpc.ExecutionFilter)
applog := r.EventPayload().(*state.AppExecResult) applog := r.EventPayload().(*state.AppExecResult)
return applog.VMState.String() == filt.State stateOK := filt.State == nil || applog.VMState.String() == *filt.State
containerOK := filt.Container == nil || applog.Container.Equals(*filt.Container)
return stateOK && containerOK
case neorpc.NotaryRequestEventID: case neorpc.NotaryRequestEventID:
filt := filter.(neorpc.TxFilter) filt := filter.(neorpc.TxFilter)
req := r.EventPayload().(*result.NotaryRequestEvent) req := r.EventPayload().(*result.NotaryRequestEvent)

View file

@ -47,6 +47,8 @@ func TestMatches(t *testing.T) {
signer := util.Uint160{4, 5, 6} signer := util.Uint160{4, 5, 6}
contract := util.Uint160{7, 8, 9} contract := util.Uint160{7, 8, 9}
badUint160 := util.Uint160{9, 9, 9} badUint160 := util.Uint160{9, 9, 9}
cnt := util.Uint256{1, 2, 3}
badUint256 := util.Uint256{9, 9, 9}
name := "ntf name" name := "ntf name"
badName := "bad name" badName := "bad name"
bContainer := testContainer{ bContainer := testContainer{
@ -56,6 +58,7 @@ func TestMatches(t *testing.T) {
}, },
} }
st := vmstate.Halt st := vmstate.Halt
goodState := st.String()
badState := "FAULT" badState := "FAULT"
txContainer := testContainer{ txContainer := testContainer{
id: neorpc.TransactionEventID, id: neorpc.TransactionEventID,
@ -67,7 +70,7 @@ func TestMatches(t *testing.T) {
} }
exContainer := testContainer{ exContainer := testContainer{
id: neorpc.ExecutionEventID, id: neorpc.ExecutionEventID,
pld: &state.AppExecResult{Execution: state.Execution{VMState: st}}, pld: &state.AppExecResult{Container: cnt, Execution: state.Execution{VMState: st}},
} }
ntrContainer := testContainer{ ntrContainer := testContainer{
id: neorpc.NotaryRequestEventID, id: neorpc.NotaryRequestEventID,
@ -208,7 +211,16 @@ func TestMatches(t *testing.T) {
name: "execution, state mismatch", name: "execution, state mismatch",
comparator: testComparator{ comparator: testComparator{
id: neorpc.ExecutionEventID, id: neorpc.ExecutionEventID,
filter: neorpc.ExecutionFilter{State: badState}, filter: neorpc.ExecutionFilter{State: &badState},
},
container: exContainer,
expected: false,
},
{
name: "execution, container mismatch",
comparator: testComparator{
id: neorpc.ExecutionEventID,
filter: neorpc.ExecutionFilter{Container: &badUint256},
}, },
container: exContainer, container: exContainer,
expected: false, expected: false,
@ -217,7 +229,7 @@ func TestMatches(t *testing.T) {
name: "execution, filter mismatch", name: "execution, filter mismatch",
comparator: testComparator{ comparator: testComparator{
id: neorpc.ExecutionEventID, id: neorpc.ExecutionEventID,
filter: neorpc.ExecutionFilter{State: st.String()}, filter: neorpc.ExecutionFilter{State: &goodState, Container: &cnt},
}, },
container: exContainer, container: exContainer,
expected: true, expected: true,

View file

@ -95,7 +95,8 @@ type (
// events. It allows to choose failing or successful transactions based // events. It allows to choose failing or successful transactions based
// on their VM state. // on their VM state.
ExecutionFilter struct { ExecutionFilter struct {
State string `json:"state"` State *string `json:"state,omitempty"`
Container *util.Uint256 `json:"container,omitempty"`
} }
// SignerWithWitness represents transaction's signer with the corresponding witness. // SignerWithWitness represents transaction's signer with the corresponding witness.
SignerWithWitness struct { SignerWithWitness struct {

View file

@ -45,7 +45,7 @@ type (
RPCPollingWaiter RPCPollingWaiter
SubscribeForNewBlocksWithChan(primary *int, since *uint32, rcvrCh chan<- rpcclient.Notification) (string, error) SubscribeForNewBlocksWithChan(primary *int, since *uint32, rcvrCh chan<- rpcclient.Notification) (string, error)
SubscribeForTransactionExecutionsWithChan(state *string, rcvrCh chan<- rpcclient.Notification) (string, error) SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- rpcclient.Notification) (string, error)
Unsubscribe(id string) error Unsubscribe(id string) error
} }
) )
@ -156,7 +156,7 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) (
waitErr = fmt.Errorf(errFmt, errArgs...) waitErr = fmt.Errorf(errFmt, errArgs...)
} }
}() }()
txsID, err := c.SubscribeForTransactionExecutionsWithChan(nil, rcvr) txsID, err := c.SubscribeForTransactionExecutionsWithChan(nil, nil, rcvr)
if err != nil { if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err) wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err)
return return

View file

@ -475,23 +475,26 @@ func (c *WSClient) SubscribeForExecutionNotificationsWithChan(contract *util.Uin
// SubscribeForTransactionExecutions adds subscription for application execution // SubscribeForTransactionExecutions adds subscription for application execution
// results generated during transaction execution to this instance of the client. It can // results generated during transaction execution to this instance of the client. It can
// be filtered by state (HALT/FAULT) to check for successful or failing // be filtered by state (HALT/FAULT) to check for successful or failing
// transactions, nil value means no filtering. // transactions; it can also be filtered by script container hash.
func (c *WSClient) SubscribeForTransactionExecutions(state *string) (string, error) { // nil value means no filtering.
return c.SubscribeForTransactionExecutionsWithChan(state, c.Notifications) func (c *WSClient) SubscribeForTransactionExecutions(state *string, container *util.Uint256) (string, error) {
return c.SubscribeForTransactionExecutionsWithChan(state, container, c.Notifications)
} }
// SubscribeForTransactionExecutionsWithChan registers provided channel as a // SubscribeForTransactionExecutionsWithChan registers provided channel as a
// receiver for the specified execution notifications. The receiver channel must be // receiver for the specified execution notifications. The receiver channel must be
// properly read and drained after usage in order not to block other notification // properly read and drained after usage in order not to block other notification
// receivers. See SubscribeForTransactionExecutions for parameter details. // receivers. See SubscribeForTransactionExecutions for parameter details.
func (c *WSClient) SubscribeForTransactionExecutionsWithChan(state *string, rcvrCh chan<- Notification) (string, error) { func (c *WSClient) SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- Notification) (string, error) {
params := []interface{}{"transaction_executed"} params := []interface{}{"transaction_executed"}
var flt *neorpc.ExecutionFilter var flt *neorpc.ExecutionFilter
if state != nil || container != nil {
if state != nil { if state != nil {
if *state != "HALT" && *state != "FAULT" { if *state != "HALT" && *state != "FAULT" {
return "", errors.New("bad state parameter") return "", errors.New("bad state parameter")
} }
flt = &neorpc.ExecutionFilter{State: *state} }
flt = &neorpc.ExecutionFilter{State: state, Container: container}
params = append(params, *flt) params = append(params, *flt)
} }
rcvr := notificationReceiver{ rcvr := notificationReceiver{

View file

@ -53,10 +53,10 @@ func TestWSClientSubscription(t *testing.T) {
return wsc.SubscribeForExecutionNotificationsWithChan(nil, nil, ch) return wsc.SubscribeForExecutionNotificationsWithChan(nil, nil, ch)
}, },
"executions": func(wsc *WSClient) (string, error) { "executions": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForTransactionExecutions(nil) return wsc.SubscribeForTransactionExecutions(nil, nil)
}, },
"executions_with_custom_ch": func(wsc *WSClient) (string, error) { "executions_with_custom_ch": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForTransactionExecutionsWithChan(nil, ch) return wsc.SubscribeForTransactionExecutionsWithChan(nil, nil, ch)
}, },
} }
t.Run("good", func(t *testing.T) { t.Run("good", func(t *testing.T) {
@ -206,6 +206,8 @@ func TestWSClientEvents(t *testing.T) {
ch1 := make(chan Notification) ch1 := make(chan Notification)
ch2 := make(chan Notification) ch2 := make(chan Notification)
ch3 := make(chan Notification) ch3 := make(chan Notification)
halt := "HALT"
fault := "FAULT"
wsc.subscriptionsLock.Lock() wsc.subscriptionsLock.Lock()
wsc.subscriptions["0"] = notificationReceiver{typ: neorpc.BlockEventID, ch: wsc.Notifications} wsc.subscriptions["0"] = notificationReceiver{typ: neorpc.BlockEventID, ch: wsc.Notifications}
wsc.subscriptions["1"] = notificationReceiver{typ: neorpc.ExecutionEventID, ch: wsc.Notifications} wsc.subscriptions["1"] = notificationReceiver{typ: neorpc.ExecutionEventID, ch: wsc.Notifications}
@ -213,8 +215,8 @@ func TestWSClientEvents(t *testing.T) {
wsc.subscriptions["3"] = notificationReceiver{typ: neorpc.BlockEventID, ch: ch1} wsc.subscriptions["3"] = notificationReceiver{typ: neorpc.BlockEventID, ch: ch1}
wsc.subscriptions["4"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2} wsc.subscriptions["4"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2}
wsc.subscriptions["5"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2} // check duplicating subscriptions wsc.subscriptions["5"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2} // check duplicating subscriptions
wsc.subscriptions["6"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: "HALT"}, ch: ch2} wsc.subscriptions["6"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: &halt}, ch: ch2}
wsc.subscriptions["7"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: "FAULT"}, ch: ch3} wsc.subscriptions["7"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: &fault}, ch: ch3}
// MissedEvent must be delivered without subscription. // MissedEvent must be delivered without subscription.
wsc.subscriptionsLock.Unlock() wsc.subscriptionsLock.Unlock()
@ -272,7 +274,7 @@ func TestWSExecutionVMStateCheck(t *testing.T) {
wsc.getNextRequestID = getTestRequestID wsc.getNextRequestID = getTestRequestID
require.NoError(t, wsc.Init()) require.NoError(t, wsc.Init())
filter := "NONE" filter := "NONE"
_, err = wsc.SubscribeForTransactionExecutions(&filter) _, err = wsc.SubscribeForTransactionExecutions(&filter, nil)
require.Error(t, err) require.Error(t, err)
wsc.Close() wsc.Close()
} }
@ -414,17 +416,47 @@ func TestWSFilteredSubscriptions(t *testing.T) {
require.Equal(t, "my_pretty_notification", *filt.Name) require.Equal(t, "my_pretty_notification", *filt.Name)
}, },
}, },
{"executions", {"executions state",
func(t *testing.T, wsc *WSClient) { func(t *testing.T, wsc *WSClient) {
state := "FAULT" state := "FAULT"
_, err := wsc.SubscribeForTransactionExecutions(&state) _, err := wsc.SubscribeForTransactionExecutions(&state, nil)
require.NoError(t, err) require.NoError(t, err)
}, },
func(t *testing.T, p *params.Params) { func(t *testing.T, p *params.Params) {
param := p.Value(1) param := p.Value(1)
filt := new(neorpc.ExecutionFilter) filt := new(neorpc.ExecutionFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt)) require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, "FAULT", filt.State) require.Equal(t, "FAULT", *filt.State)
require.Equal(t, (*util.Uint256)(nil), filt.Container)
},
},
{"executions container",
func(t *testing.T, wsc *WSClient) {
container := util.Uint256{1, 2, 3}
_, err := wsc.SubscribeForTransactionExecutions(nil, &container)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.ExecutionFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, (*string)(nil), filt.State)
require.Equal(t, util.Uint256{1, 2, 3}, *filt.Container)
},
},
{"executions state and container",
func(t *testing.T, wsc *WSClient) {
state := "FAULT"
container := util.Uint256{1, 2, 3}
_, err := wsc.SubscribeForTransactionExecutions(&state, &container)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.ExecutionFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, "FAULT", *filt.State)
require.Equal(t, util.Uint256{1, 2, 3}, *filt.Container)
}, },
}, },
} }

View file

@ -2426,7 +2426,7 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (interface{
case neorpc.ExecutionEventID: case neorpc.ExecutionEventID:
flt := new(neorpc.ExecutionFilter) flt := new(neorpc.ExecutionFilter)
err = jd.Decode(flt) err = jd.Decode(flt)
if err == nil && (flt.State == "HALT" || flt.State == "FAULT") { if err == nil && (flt.State != nil && (*flt.State == "HALT" || *flt.State == "FAULT")) {
filter = *flt filter = *flt
} else if err == nil { } else if err == nil {
err = errors.New("invalid state") err = errors.New("invalid state")