rpc: add "since" filter to WS block events

This commit is contained in:
Anna Shaleva 2022-10-18 15:09:30 +03:00
parent 10a0716217
commit 673a495527
7 changed files with 75 additions and 25 deletions

View file

@ -10,7 +10,7 @@ receive them as JSON-RPC notifications from the server.
Currently supported events: Currently supported events:
* new block added * new block added
Contents: block. Filters: primary ID. Contents: block. Filters: primary ID, since block index.
* new transaction in the block * new transaction in the block
Contents: transaction. Filters: sender and signer. Contents: transaction. Filters: sender and signer.
@ -57,7 +57,8 @@ omitted if empty).
Recognized stream names: Recognized stream names:
* `block_added` * `block_added`
Filter: `primary` as an integer with primary (speaker) node index from Filter: `primary` as an integer with primary (speaker) node index from
ConsensusData. ConsensusData and/or `since` as an integer with block index starting from
which new block notifications will be received.
* `transaction_added` * `transaction_added`
Filter: `sender` field containing a string with hex-encoded Uint160 (LE Filter: `sender` field containing a string with hex-encoded Uint160 (LE
representation) for transaction's `Sender` and/or `signer` in the same representation) for transaction's `Sender` and/or `signer` in the same

View file

@ -37,7 +37,9 @@ func Matches(f Comparator, r Container) bool {
case neorpc.BlockEventID: case neorpc.BlockEventID:
filt := filter.(neorpc.BlockFilter) filt := filter.(neorpc.BlockFilter)
b := r.EventPayload().(*block.Block) b := r.EventPayload().(*block.Block)
return int(b.PrimaryIndex) == filt.Primary primaryOk := filt.Primary == nil || *filt.Primary == int(b.PrimaryIndex)
sinceOk := filt.Since == nil || *filt.Since <= b.Index
return primaryOk && sinceOk
case neorpc.TransactionEventID: case neorpc.TransactionEventID:
filt := filter.(neorpc.TxFilter) filt := filter.(neorpc.TxFilter)
tx := r.EventPayload().(*transaction.Transaction) tx := r.EventPayload().(*transaction.Transaction)

View file

@ -39,7 +39,10 @@ func (c testContainer) EventPayload() interface{} {
} }
func TestMatches(t *testing.T) { func TestMatches(t *testing.T) {
primary := byte(1) primary := 1
badPrimary := 2
index := uint32(5)
badHigherIndex := uint32(6)
sender := util.Uint160{1, 2, 3} sender := util.Uint160{1, 2, 3}
signer := util.Uint160{4, 5, 6} signer := util.Uint160{4, 5, 6}
contract := util.Uint160{7, 8, 9} contract := util.Uint160{7, 8, 9}
@ -49,7 +52,7 @@ func TestMatches(t *testing.T) {
bContainer := testContainer{ bContainer := testContainer{
id: neorpc.BlockEventID, id: neorpc.BlockEventID,
pld: &block.Block{ pld: &block.Block{
Header: block.Header{PrimaryIndex: primary}, Header: block.Header{PrimaryIndex: byte(primary), Index: index},
}, },
} }
st := vmstate.Halt st := vmstate.Halt
@ -106,7 +109,16 @@ func TestMatches(t *testing.T) {
name: "block, primary mismatch", name: "block, primary mismatch",
comparator: testComparator{ comparator: testComparator{
id: neorpc.BlockEventID, id: neorpc.BlockEventID,
filter: neorpc.BlockFilter{Primary: int(primary + 1)}, filter: neorpc.BlockFilter{Primary: &badPrimary},
},
container: bContainer,
expected: false,
},
{
name: "block, since mismatch",
comparator: testComparator{
id: neorpc.BlockEventID,
filter: neorpc.BlockFilter{Since: &badHigherIndex},
}, },
container: bContainer, container: bContainer,
expected: false, expected: false,
@ -115,7 +127,7 @@ func TestMatches(t *testing.T) {
name: "block, filter match", name: "block, filter match",
comparator: testComparator{ comparator: testComparator{
id: neorpc.BlockEventID, id: neorpc.BlockEventID,
filter: neorpc.BlockFilter{Primary: int(primary)}, filter: neorpc.BlockFilter{Primary: &primary, Since: &index},
}, },
container: bContainer, container: bContainer,
expected: true, expected: true,

View file

@ -71,10 +71,12 @@ type (
Payload []interface{} `json:"params"` Payload []interface{} `json:"params"`
} }
// BlockFilter is a wrapper structure for the block event filter. The only // BlockFilter is a wrapper structure for the block event filter. It allows
// allowed filter is primary index. // to filter blocks by primary index and by block index (allowing blocks since
// the specified index).
BlockFilter struct { BlockFilter struct {
Primary int `json:"primary"` Primary *int `json:"primary,omitempty"`
Since *uint32 `json:"since,omitempty"`
} }
// TxFilter is a wrapper structure for the transaction event filter. It // TxFilter is a wrapper structure for the transaction event filter. It
// allows to filter transactions by senders and signers. // allows to filter transactions by senders and signers.

View file

@ -45,7 +45,7 @@ type (
RPCEventWaiter interface { RPCEventWaiter interface {
RPCPollingWaiter RPCPollingWaiter
SubscribeForNewBlocksWithChan(primary *int, rcvrCh chan<- rpcclient.Notification) (string, error) SubscribeForNewBlocksWithChan(primary *int, since *uint32, rcvrCh chan<- rpcclient.Notification) (string, error)
SubscribeForTransactionExecutionsWithChan(state *string, rcvrCh chan<- rpcclient.Notification) (string, error) SubscribeForTransactionExecutionsWithChan(state *string, rcvrCh chan<- rpcclient.Notification) (string, error)
Unsubscribe(id string) error Unsubscribe(id string) error
} }
@ -138,7 +138,7 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) (
} }
close(rcvr) close(rcvr)
}() }()
blocksID, err := c.SubscribeForNewBlocksWithChan(nil, rcvr) blocksID, err := c.SubscribeForNewBlocksWithChan(nil, nil, rcvr)
if err != nil { if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err) wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err)
return return

View file

@ -393,22 +393,23 @@ func (c *WSClient) performUnsubscription(id string) error {
} }
// SubscribeForNewBlocks adds subscription for new block events to this instance // SubscribeForNewBlocks adds subscription for new block events to this instance
// of the client. It can be filtered by primary consensus node index, nil value doesn't // of the client. It can be filtered by primary consensus node index and/or block
// add any filters. // index allowing to receive blocks since the specified index only, nil value is
func (c *WSClient) SubscribeForNewBlocks(primary *int) (string, error) { // treated as missing filter.
return c.SubscribeForNewBlocksWithChan(primary, c.Notifications) func (c *WSClient) SubscribeForNewBlocks(primary *int, sinceIndex *uint32) (string, error) {
return c.SubscribeForNewBlocksWithChan(primary, sinceIndex, c.Notifications)
} }
// SubscribeForNewBlocksWithChan registers provided channel as a receiver for the // SubscribeForNewBlocksWithChan registers provided channel as a receiver for the
// specified new blocks notifications. The receiver channel must be properly read and // specified new blocks notifications. The receiver channel must be properly read and
// drained after usage in order not to block other notification receivers. // drained after usage in order not to block other notification receivers.
// See SubscribeForNewBlocks for parameter details. // See SubscribeForNewBlocks for parameter details.
func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, rcvrCh chan<- Notification) (string, error) { func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, sinceIndex *uint32, rcvrCh chan<- Notification) (string, error) {
params := []interface{}{"block_added"} params := []interface{}{"block_added"}
var flt *neorpc.BlockFilter var flt *neorpc.BlockFilter
if primary != nil { if primary != nil || sinceIndex != nil {
flt = &neorpc.BlockFilter{Primary: *primary} flt = &neorpc.BlockFilter{Primary: primary, Since: sinceIndex}
params = append(params, *flt) params = append(params, flt)
} }
rcvr := notificationReceiver{ rcvr := notificationReceiver{
typ: neorpc.BlockEventID, typ: neorpc.BlockEventID,

View file

@ -35,10 +35,10 @@ func TestWSClientSubscription(t *testing.T) {
ch := make(chan Notification) ch := make(chan Notification)
var cases = map[string]func(*WSClient) (string, error){ var cases = map[string]func(*WSClient) (string, error){
"blocks": func(wsc *WSClient) (string, error) { "blocks": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForNewBlocks(nil) return wsc.SubscribeForNewBlocks(nil, nil)
}, },
"blocks_with_custom_ch": func(wsc *WSClient) (string, error) { "blocks_with_custom_ch": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForNewBlocksWithChan(nil, ch) return wsc.SubscribeForNewBlocksWithChan(nil, nil, ch)
}, },
"transactions": func(wsc *WSClient) (string, error) { "transactions": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForNewTransactions(nil, nil) return wsc.SubscribeForNewTransactions(nil, nil)
@ -283,17 +283,49 @@ func TestWSFilteredSubscriptions(t *testing.T) {
clientCode func(*testing.T, *WSClient) clientCode func(*testing.T, *WSClient)
serverCode func(*testing.T, *params.Params) serverCode func(*testing.T, *params.Params)
}{ }{
{"blocks", {"blocks primary",
func(t *testing.T, wsc *WSClient) { func(t *testing.T, wsc *WSClient) {
primary := 3 primary := 3
_, err := wsc.SubscribeForNewBlocks(&primary) _, err := wsc.SubscribeForNewBlocks(&primary, nil)
require.NoError(t, err) require.NoError(t, err)
}, },
func(t *testing.T, p *params.Params) { func(t *testing.T, p *params.Params) {
param := p.Value(1) param := p.Value(1)
filt := new(neorpc.BlockFilter) filt := new(neorpc.BlockFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt)) require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, 3, filt.Primary) require.Equal(t, 3, *filt.Primary)
require.Equal(t, (*uint32)(nil), filt.Since)
},
},
{"blocks since",
func(t *testing.T, wsc *WSClient) {
var since uint32 = 3
_, err := wsc.SubscribeForNewBlocks(nil, &since)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.BlockFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, (*int)(nil), filt.Primary)
require.Equal(t, uint32(3), *filt.Since)
},
},
{"blocks primary and since",
func(t *testing.T, wsc *WSClient) {
var (
since uint32 = 3
primary = 2
)
_, err := wsc.SubscribeForNewBlocks(&primary, &since)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.BlockFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, 2, *filt.Primary)
require.Equal(t, uint32(3), *filt.Since)
}, },
}, },
{"transactions sender", {"transactions sender",