From 673a495527602e754ad5670e3f8a097046d1cbd9 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Tue, 18 Oct 2022 15:09:30 +0300 Subject: [PATCH] rpc: add "since" filter to WS block events --- docs/notifications.md | 5 ++-- pkg/neorpc/rpcevent/filter.go | 4 ++- pkg/neorpc/rpcevent/filter_test.go | 20 +++++++++++--- pkg/neorpc/types.go | 8 +++--- pkg/rpcclient/actor/waiter.go | 4 +-- pkg/rpcclient/wsclient.go | 17 ++++++------ pkg/rpcclient/wsclient_test.go | 42 ++++++++++++++++++++++++++---- 7 files changed, 75 insertions(+), 25 deletions(-) diff --git a/docs/notifications.md b/docs/notifications.md index 7c73c5142..7c857fda8 100644 --- a/docs/notifications.md +++ b/docs/notifications.md @@ -10,7 +10,7 @@ receive them as JSON-RPC notifications from the server. Currently supported events: * new block added - Contents: block. Filters: primary ID. + Contents: block. Filters: primary ID, since block index. * new transaction in the block Contents: transaction. Filters: sender and signer. @@ -57,7 +57,8 @@ omitted if empty). Recognized stream names: * `block_added` Filter: `primary` as an integer with primary (speaker) node index from - ConsensusData. + ConsensusData and/or `since` as an integer with block index starting from + which new block notifications will be received. * `transaction_added` Filter: `sender` field containing a string with hex-encoded Uint160 (LE representation) for transaction's `Sender` and/or `signer` in the same diff --git a/pkg/neorpc/rpcevent/filter.go b/pkg/neorpc/rpcevent/filter.go index e2ca74c1b..07f0dc9f7 100644 --- a/pkg/neorpc/rpcevent/filter.go +++ b/pkg/neorpc/rpcevent/filter.go @@ -37,7 +37,9 @@ func Matches(f Comparator, r Container) bool { case neorpc.BlockEventID: filt := filter.(neorpc.BlockFilter) b := r.EventPayload().(*block.Block) - return int(b.PrimaryIndex) == filt.Primary + primaryOk := filt.Primary == nil || *filt.Primary == int(b.PrimaryIndex) + sinceOk := filt.Since == nil || *filt.Since <= b.Index + return primaryOk && sinceOk case neorpc.TransactionEventID: filt := filter.(neorpc.TxFilter) tx := r.EventPayload().(*transaction.Transaction) diff --git a/pkg/neorpc/rpcevent/filter_test.go b/pkg/neorpc/rpcevent/filter_test.go index d16c222e6..71edeb7d1 100644 --- a/pkg/neorpc/rpcevent/filter_test.go +++ b/pkg/neorpc/rpcevent/filter_test.go @@ -39,7 +39,10 @@ func (c testContainer) EventPayload() interface{} { } func TestMatches(t *testing.T) { - primary := byte(1) + primary := 1 + badPrimary := 2 + index := uint32(5) + badHigherIndex := uint32(6) sender := util.Uint160{1, 2, 3} signer := util.Uint160{4, 5, 6} contract := util.Uint160{7, 8, 9} @@ -49,7 +52,7 @@ func TestMatches(t *testing.T) { bContainer := testContainer{ id: neorpc.BlockEventID, pld: &block.Block{ - Header: block.Header{PrimaryIndex: primary}, + Header: block.Header{PrimaryIndex: byte(primary), Index: index}, }, } st := vmstate.Halt @@ -106,7 +109,16 @@ func TestMatches(t *testing.T) { name: "block, primary mismatch", comparator: testComparator{ id: neorpc.BlockEventID, - filter: neorpc.BlockFilter{Primary: int(primary + 1)}, + filter: neorpc.BlockFilter{Primary: &badPrimary}, + }, + container: bContainer, + expected: false, + }, + { + name: "block, since mismatch", + comparator: testComparator{ + id: neorpc.BlockEventID, + filter: neorpc.BlockFilter{Since: &badHigherIndex}, }, container: bContainer, expected: false, @@ -115,7 +127,7 @@ func TestMatches(t *testing.T) { name: "block, filter match", comparator: testComparator{ id: neorpc.BlockEventID, - filter: neorpc.BlockFilter{Primary: int(primary)}, + filter: neorpc.BlockFilter{Primary: &primary, Since: &index}, }, container: bContainer, expected: true, diff --git a/pkg/neorpc/types.go b/pkg/neorpc/types.go index ff01cbfa7..70bab912c 100644 --- a/pkg/neorpc/types.go +++ b/pkg/neorpc/types.go @@ -71,10 +71,12 @@ type ( Payload []interface{} `json:"params"` } - // BlockFilter is a wrapper structure for the block event filter. The only - // allowed filter is primary index. + // BlockFilter is a wrapper structure for the block event filter. It allows + // to filter blocks by primary index and by block index (allowing blocks since + // the specified index). BlockFilter struct { - Primary int `json:"primary"` + Primary *int `json:"primary,omitempty"` + Since *uint32 `json:"since,omitempty"` } // TxFilter is a wrapper structure for the transaction event filter. It // allows to filter transactions by senders and signers. diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go index f42707631..21f9a0edf 100644 --- a/pkg/rpcclient/actor/waiter.go +++ b/pkg/rpcclient/actor/waiter.go @@ -45,7 +45,7 @@ type ( RPCEventWaiter interface { RPCPollingWaiter - SubscribeForNewBlocksWithChan(primary *int, rcvrCh chan<- rpcclient.Notification) (string, error) + SubscribeForNewBlocksWithChan(primary *int, since *uint32, rcvrCh chan<- rpcclient.Notification) (string, error) SubscribeForTransactionExecutionsWithChan(state *string, rcvrCh chan<- rpcclient.Notification) (string, error) Unsubscribe(id string) error } @@ -138,7 +138,7 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) ( } close(rcvr) }() - blocksID, err := c.SubscribeForNewBlocksWithChan(nil, rcvr) + blocksID, err := c.SubscribeForNewBlocksWithChan(nil, nil, rcvr) if err != nil { wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err) return diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index ad68c7d39..9a6d4da90 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -393,22 +393,23 @@ func (c *WSClient) performUnsubscription(id string) error { } // SubscribeForNewBlocks adds subscription for new block events to this instance -// of the client. It can be filtered by primary consensus node index, nil value doesn't -// add any filters. -func (c *WSClient) SubscribeForNewBlocks(primary *int) (string, error) { - return c.SubscribeForNewBlocksWithChan(primary, c.Notifications) +// of the client. It can be filtered by primary consensus node index and/or block +// index allowing to receive blocks since the specified index only, nil value is +// treated as missing filter. +func (c *WSClient) SubscribeForNewBlocks(primary *int, sinceIndex *uint32) (string, error) { + return c.SubscribeForNewBlocksWithChan(primary, sinceIndex, c.Notifications) } // SubscribeForNewBlocksWithChan registers provided channel as a receiver for the // specified new blocks notifications. The receiver channel must be properly read and // drained after usage in order not to block other notification receivers. // See SubscribeForNewBlocks for parameter details. -func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, rcvrCh chan<- Notification) (string, error) { +func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, sinceIndex *uint32, rcvrCh chan<- Notification) (string, error) { params := []interface{}{"block_added"} var flt *neorpc.BlockFilter - if primary != nil { - flt = &neorpc.BlockFilter{Primary: *primary} - params = append(params, *flt) + if primary != nil || sinceIndex != nil { + flt = &neorpc.BlockFilter{Primary: primary, Since: sinceIndex} + params = append(params, flt) } rcvr := notificationReceiver{ typ: neorpc.BlockEventID, diff --git a/pkg/rpcclient/wsclient_test.go b/pkg/rpcclient/wsclient_test.go index 1bc396a21..6f6ec9bdf 100644 --- a/pkg/rpcclient/wsclient_test.go +++ b/pkg/rpcclient/wsclient_test.go @@ -35,10 +35,10 @@ func TestWSClientSubscription(t *testing.T) { ch := make(chan Notification) var cases = map[string]func(*WSClient) (string, error){ "blocks": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForNewBlocks(nil) + return wsc.SubscribeForNewBlocks(nil, nil) }, "blocks_with_custom_ch": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForNewBlocksWithChan(nil, ch) + return wsc.SubscribeForNewBlocksWithChan(nil, nil, ch) }, "transactions": func(wsc *WSClient) (string, error) { return wsc.SubscribeForNewTransactions(nil, nil) @@ -283,17 +283,49 @@ func TestWSFilteredSubscriptions(t *testing.T) { clientCode func(*testing.T, *WSClient) serverCode func(*testing.T, *params.Params) }{ - {"blocks", + {"blocks primary", func(t *testing.T, wsc *WSClient) { primary := 3 - _, err := wsc.SubscribeForNewBlocks(&primary) + _, err := wsc.SubscribeForNewBlocks(&primary, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { param := p.Value(1) filt := new(neorpc.BlockFilter) require.NoError(t, json.Unmarshal(param.RawMessage, filt)) - require.Equal(t, 3, filt.Primary) + require.Equal(t, 3, *filt.Primary) + require.Equal(t, (*uint32)(nil), filt.Since) + }, + }, + {"blocks since", + func(t *testing.T, wsc *WSClient) { + var since uint32 = 3 + _, err := wsc.SubscribeForNewBlocks(nil, &since) + require.NoError(t, err) + }, + func(t *testing.T, p *params.Params) { + param := p.Value(1) + filt := new(neorpc.BlockFilter) + require.NoError(t, json.Unmarshal(param.RawMessage, filt)) + require.Equal(t, (*int)(nil), filt.Primary) + require.Equal(t, uint32(3), *filt.Since) + }, + }, + {"blocks primary and since", + func(t *testing.T, wsc *WSClient) { + var ( + since uint32 = 3 + primary = 2 + ) + _, err := wsc.SubscribeForNewBlocks(&primary, &since) + require.NoError(t, err) + }, + func(t *testing.T, p *params.Params) { + param := p.Value(1) + filt := new(neorpc.BlockFilter) + require.NoError(t, json.Unmarshal(param.RawMessage, filt)) + require.Equal(t, 2, *filt.Primary) + require.Equal(t, uint32(3), *filt.Since) }, }, {"transactions sender",