diff --git a/docs/notifications.md b/docs/notifications.md index a35867a02..8a2b7ffdb 100644 --- a/docs/notifications.md +++ b/docs/notifications.md @@ -10,7 +10,7 @@ receive them as JSON-RPC notifications from the server. Currently supported events: * new block added - Contents: block. Filters: primary ID, since block index. + Contents: block. Filters: primary ID, since/till block indexes. * new transaction in the block Contents: transaction. Filters: sender and signer. @@ -57,8 +57,10 @@ omitted if empty). Recognized stream names: * `block_added` Filter: `primary` as an integer with primary (speaker) node index from - ConsensusData and/or `since` as an integer with block index starting from - which new block notifications will be received. + ConsensusData and/or `since` field as an integer value with block + index starting from which new block notifications will be received and/or + `till` field as an integer values containing block index till which new + block notifications will be received. * `transaction_added` Filter: `sender` field containing a string with hex-encoded Uint160 (LE representation) for transaction's `Sender` and/or `signer` in the same diff --git a/pkg/neorpc/rpcevent/filter.go b/pkg/neorpc/rpcevent/filter.go index 65de9df59..049943498 100644 --- a/pkg/neorpc/rpcevent/filter.go +++ b/pkg/neorpc/rpcevent/filter.go @@ -39,7 +39,8 @@ func Matches(f Comparator, r Container) bool { b := r.EventPayload().(*block.Block) primaryOk := filt.Primary == nil || *filt.Primary == int(b.PrimaryIndex) sinceOk := filt.Since == nil || *filt.Since <= b.Index - return primaryOk && sinceOk + tillOk := filt.Till == nil || b.Index <= *filt.Till + return primaryOk && sinceOk && tillOk case neorpc.TransactionEventID: filt := filter.(neorpc.TxFilter) tx := r.EventPayload().(*transaction.Transaction) diff --git a/pkg/neorpc/rpcevent/filter_test.go b/pkg/neorpc/rpcevent/filter_test.go index 9e36a7b54..6afd20d13 100644 --- a/pkg/neorpc/rpcevent/filter_test.go +++ b/pkg/neorpc/rpcevent/filter_test.go @@ -43,6 +43,7 @@ func TestMatches(t *testing.T) { badPrimary := 2 index := uint32(5) badHigherIndex := uint32(6) + badLowerIndex := index - 1 sender := util.Uint160{1, 2, 3} signer := util.Uint160{4, 5, 6} contract := util.Uint160{7, 8, 9} @@ -126,11 +127,20 @@ func TestMatches(t *testing.T) { container: bContainer, expected: false, }, + { + name: "block, till mismatch", + comparator: testComparator{ + id: neorpc.BlockEventID, + filter: neorpc.BlockFilter{Till: &badLowerIndex}, + }, + container: bContainer, + expected: false, + }, { name: "block, filter match", comparator: testComparator{ id: neorpc.BlockEventID, - filter: neorpc.BlockFilter{Primary: &primary, Since: &index}, + filter: neorpc.BlockFilter{Primary: &primary, Since: &index, Till: &index}, }, container: bContainer, expected: true, diff --git a/pkg/neorpc/types.go b/pkg/neorpc/types.go index f5cf2882f..aa20ec50c 100644 --- a/pkg/neorpc/types.go +++ b/pkg/neorpc/types.go @@ -77,6 +77,7 @@ type ( BlockFilter struct { Primary *int `json:"primary,omitempty"` Since *uint32 `json:"since,omitempty"` + Till *uint32 `json:"till,omitempty"` } // TxFilter is a wrapper structure for the transaction event filter. It // allows to filter transactions by senders and signers. diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go index 9085d9045..5c19ebccd 100644 --- a/pkg/rpcclient/actor/waiter.go +++ b/pkg/rpcclient/actor/waiter.go @@ -44,7 +44,7 @@ type ( RPCEventWaiter interface { RPCPollingWaiter - SubscribeForNewBlocksWithChan(primary *int, since *uint32, rcvrCh chan<- rpcclient.Notification) (string, error) + SubscribeForNewBlocksWithChan(primary *int, since *uint32, till *uint32, rcvrCh chan<- rpcclient.Notification) (string, error) SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- rpcclient.Notification) (string, error) Unsubscribe(id string) error } @@ -139,7 +139,7 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) ( }() // Execution event follows the block event, thus wait until the block next to the VUB to be sure. since := vub + 1 - blocksID, err := c.SubscribeForNewBlocksWithChan(nil, &since, rcvr) + blocksID, err := c.SubscribeForNewBlocksWithChan(nil, &since, nil, rcvr) if err != nil { wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err) return diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index 8eafd55af..da1018639 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -396,19 +396,19 @@ func (c *WSClient) performUnsubscription(id string) error { // of the client. It can be filtered by primary consensus node index and/or block // index allowing to receive blocks since the specified index only, nil value is // treated as missing filter. -func (c *WSClient) SubscribeForNewBlocks(primary *int, sinceIndex *uint32) (string, error) { - return c.SubscribeForNewBlocksWithChan(primary, sinceIndex, c.Notifications) +func (c *WSClient) SubscribeForNewBlocks(primary *int, sinceIndex, tillIndex *uint32) (string, error) { + return c.SubscribeForNewBlocksWithChan(primary, sinceIndex, tillIndex, c.Notifications) } // SubscribeForNewBlocksWithChan registers provided channel as a receiver for the // specified new blocks notifications. The receiver channel must be properly read and // drained after usage in order not to block other notification receivers. // See SubscribeForNewBlocks for parameter details. -func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, sinceIndex *uint32, rcvrCh chan<- Notification) (string, error) { +func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, sinceIndex, tillIndex *uint32, rcvrCh chan<- Notification) (string, error) { params := []interface{}{"block_added"} var flt *neorpc.BlockFilter - if primary != nil || sinceIndex != nil { - flt = &neorpc.BlockFilter{Primary: primary, Since: sinceIndex} + if primary != nil || sinceIndex != nil || tillIndex != nil { + flt = &neorpc.BlockFilter{Primary: primary, Since: sinceIndex, Till: tillIndex} params = append(params, flt) } rcvr := notificationReceiver{ diff --git a/pkg/rpcclient/wsclient_test.go b/pkg/rpcclient/wsclient_test.go index 3788182e5..5662b1a32 100644 --- a/pkg/rpcclient/wsclient_test.go +++ b/pkg/rpcclient/wsclient_test.go @@ -35,10 +35,10 @@ func TestWSClientSubscription(t *testing.T) { ch := make(chan Notification) var cases = map[string]func(*WSClient) (string, error){ "blocks": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForNewBlocks(nil, nil) + return wsc.SubscribeForNewBlocks(nil, nil, nil) }, "blocks_with_custom_ch": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForNewBlocksWithChan(nil, nil, ch) + return wsc.SubscribeForNewBlocksWithChan(nil, nil, nil, ch) }, "transactions": func(wsc *WSClient) (string, error) { return wsc.SubscribeForNewTransactions(nil, nil) @@ -288,7 +288,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"blocks primary", func(t *testing.T, wsc *WSClient) { primary := 3 - _, err := wsc.SubscribeForNewBlocks(&primary, nil) + _, err := wsc.SubscribeForNewBlocks(&primary, nil, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -297,12 +297,13 @@ func TestWSFilteredSubscriptions(t *testing.T) { require.NoError(t, json.Unmarshal(param.RawMessage, filt)) require.Equal(t, 3, *filt.Primary) require.Equal(t, (*uint32)(nil), filt.Since) + require.Equal(t, (*uint32)(nil), filt.Till) }, }, {"blocks since", func(t *testing.T, wsc *WSClient) { var since uint32 = 3 - _, err := wsc.SubscribeForNewBlocks(nil, &since) + _, err := wsc.SubscribeForNewBlocks(nil, &since, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -311,15 +312,32 @@ func TestWSFilteredSubscriptions(t *testing.T) { require.NoError(t, json.Unmarshal(param.RawMessage, filt)) require.Equal(t, (*int)(nil), filt.Primary) require.Equal(t, uint32(3), *filt.Since) + require.Equal(t, (*uint32)(nil), filt.Till) }, }, - {"blocks primary and since", + {"blocks till", + func(t *testing.T, wsc *WSClient) { + var till uint32 = 3 + _, err := wsc.SubscribeForNewBlocks(nil, nil, &till) + require.NoError(t, err) + }, + func(t *testing.T, p *params.Params) { + param := p.Value(1) + filt := new(neorpc.BlockFilter) + require.NoError(t, json.Unmarshal(param.RawMessage, filt)) + require.Equal(t, (*int)(nil), filt.Primary) + require.Equal(t, (*uint32)(nil), filt.Since) + require.Equal(t, (uint32)(3), *filt.Till) + }, + }, + {"blocks primary, since and till", func(t *testing.T, wsc *WSClient) { var ( since uint32 = 3 primary = 2 + till uint32 = 5 ) - _, err := wsc.SubscribeForNewBlocks(&primary, &since) + _, err := wsc.SubscribeForNewBlocks(&primary, &since, &till) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -328,6 +346,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { require.NoError(t, json.Unmarshal(param.RawMessage, filt)) require.Equal(t, 2, *filt.Primary) require.Equal(t, uint32(3), *filt.Since) + require.Equal(t, uint32(5), *filt.Till) }, }, {"transactions sender",