rpc: add "till" filter to WS block events
This commit is contained in:
parent
345d48d051
commit
8e84bb51d5
7 changed files with 51 additions and 18 deletions
|
@ -10,7 +10,7 @@ receive them as JSON-RPC notifications from the server.
|
||||||
Currently supported events:
|
Currently supported events:
|
||||||
* new block added
|
* new block added
|
||||||
|
|
||||||
Contents: block. Filters: primary ID, since block index.
|
Contents: block. Filters: primary ID, since/till block indexes.
|
||||||
* new transaction in the block
|
* new transaction in the block
|
||||||
|
|
||||||
Contents: transaction. Filters: sender and signer.
|
Contents: transaction. Filters: sender and signer.
|
||||||
|
@ -57,8 +57,10 @@ omitted if empty).
|
||||||
Recognized stream names:
|
Recognized stream names:
|
||||||
* `block_added`
|
* `block_added`
|
||||||
Filter: `primary` as an integer with primary (speaker) node index from
|
Filter: `primary` as an integer with primary (speaker) node index from
|
||||||
ConsensusData and/or `since` as an integer with block index starting from
|
ConsensusData and/or `since` field as an integer value with block
|
||||||
which new block notifications will be received.
|
index starting from which new block notifications will be received and/or
|
||||||
|
`till` field as an integer values containing block index till which new
|
||||||
|
block notifications will be received.
|
||||||
* `transaction_added`
|
* `transaction_added`
|
||||||
Filter: `sender` field containing a string with hex-encoded Uint160 (LE
|
Filter: `sender` field containing a string with hex-encoded Uint160 (LE
|
||||||
representation) for transaction's `Sender` and/or `signer` in the same
|
representation) for transaction's `Sender` and/or `signer` in the same
|
||||||
|
|
|
@ -39,7 +39,8 @@ func Matches(f Comparator, r Container) bool {
|
||||||
b := r.EventPayload().(*block.Block)
|
b := r.EventPayload().(*block.Block)
|
||||||
primaryOk := filt.Primary == nil || *filt.Primary == int(b.PrimaryIndex)
|
primaryOk := filt.Primary == nil || *filt.Primary == int(b.PrimaryIndex)
|
||||||
sinceOk := filt.Since == nil || *filt.Since <= b.Index
|
sinceOk := filt.Since == nil || *filt.Since <= b.Index
|
||||||
return primaryOk && sinceOk
|
tillOk := filt.Till == nil || b.Index <= *filt.Till
|
||||||
|
return primaryOk && sinceOk && tillOk
|
||||||
case neorpc.TransactionEventID:
|
case neorpc.TransactionEventID:
|
||||||
filt := filter.(neorpc.TxFilter)
|
filt := filter.(neorpc.TxFilter)
|
||||||
tx := r.EventPayload().(*transaction.Transaction)
|
tx := r.EventPayload().(*transaction.Transaction)
|
||||||
|
|
|
@ -43,6 +43,7 @@ func TestMatches(t *testing.T) {
|
||||||
badPrimary := 2
|
badPrimary := 2
|
||||||
index := uint32(5)
|
index := uint32(5)
|
||||||
badHigherIndex := uint32(6)
|
badHigherIndex := uint32(6)
|
||||||
|
badLowerIndex := index - 1
|
||||||
sender := util.Uint160{1, 2, 3}
|
sender := util.Uint160{1, 2, 3}
|
||||||
signer := util.Uint160{4, 5, 6}
|
signer := util.Uint160{4, 5, 6}
|
||||||
contract := util.Uint160{7, 8, 9}
|
contract := util.Uint160{7, 8, 9}
|
||||||
|
@ -126,11 +127,20 @@ func TestMatches(t *testing.T) {
|
||||||
container: bContainer,
|
container: bContainer,
|
||||||
expected: false,
|
expected: false,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "block, till mismatch",
|
||||||
|
comparator: testComparator{
|
||||||
|
id: neorpc.BlockEventID,
|
||||||
|
filter: neorpc.BlockFilter{Till: &badLowerIndex},
|
||||||
|
},
|
||||||
|
container: bContainer,
|
||||||
|
expected: false,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "block, filter match",
|
name: "block, filter match",
|
||||||
comparator: testComparator{
|
comparator: testComparator{
|
||||||
id: neorpc.BlockEventID,
|
id: neorpc.BlockEventID,
|
||||||
filter: neorpc.BlockFilter{Primary: &primary, Since: &index},
|
filter: neorpc.BlockFilter{Primary: &primary, Since: &index, Till: &index},
|
||||||
},
|
},
|
||||||
container: bContainer,
|
container: bContainer,
|
||||||
expected: true,
|
expected: true,
|
||||||
|
|
|
@ -77,6 +77,7 @@ type (
|
||||||
BlockFilter struct {
|
BlockFilter struct {
|
||||||
Primary *int `json:"primary,omitempty"`
|
Primary *int `json:"primary,omitempty"`
|
||||||
Since *uint32 `json:"since,omitempty"`
|
Since *uint32 `json:"since,omitempty"`
|
||||||
|
Till *uint32 `json:"till,omitempty"`
|
||||||
}
|
}
|
||||||
// TxFilter is a wrapper structure for the transaction event filter. It
|
// TxFilter is a wrapper structure for the transaction event filter. It
|
||||||
// allows to filter transactions by senders and signers.
|
// allows to filter transactions by senders and signers.
|
||||||
|
|
|
@ -44,7 +44,7 @@ type (
|
||||||
RPCEventWaiter interface {
|
RPCEventWaiter interface {
|
||||||
RPCPollingWaiter
|
RPCPollingWaiter
|
||||||
|
|
||||||
SubscribeForNewBlocksWithChan(primary *int, since *uint32, rcvrCh chan<- rpcclient.Notification) (string, error)
|
SubscribeForNewBlocksWithChan(primary *int, since *uint32, till *uint32, rcvrCh chan<- rpcclient.Notification) (string, error)
|
||||||
SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- rpcclient.Notification) (string, error)
|
SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- rpcclient.Notification) (string, error)
|
||||||
Unsubscribe(id string) error
|
Unsubscribe(id string) error
|
||||||
}
|
}
|
||||||
|
@ -139,7 +139,7 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) (
|
||||||
}()
|
}()
|
||||||
// Execution event follows the block event, thus wait until the block next to the VUB to be sure.
|
// Execution event follows the block event, thus wait until the block next to the VUB to be sure.
|
||||||
since := vub + 1
|
since := vub + 1
|
||||||
blocksID, err := c.SubscribeForNewBlocksWithChan(nil, &since, rcvr)
|
blocksID, err := c.SubscribeForNewBlocksWithChan(nil, &since, nil, rcvr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err)
|
wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err)
|
||||||
return
|
return
|
||||||
|
|
|
@ -396,19 +396,19 @@ func (c *WSClient) performUnsubscription(id string) error {
|
||||||
// of the client. It can be filtered by primary consensus node index and/or block
|
// of the client. It can be filtered by primary consensus node index and/or block
|
||||||
// index allowing to receive blocks since the specified index only, nil value is
|
// index allowing to receive blocks since the specified index only, nil value is
|
||||||
// treated as missing filter.
|
// treated as missing filter.
|
||||||
func (c *WSClient) SubscribeForNewBlocks(primary *int, sinceIndex *uint32) (string, error) {
|
func (c *WSClient) SubscribeForNewBlocks(primary *int, sinceIndex, tillIndex *uint32) (string, error) {
|
||||||
return c.SubscribeForNewBlocksWithChan(primary, sinceIndex, c.Notifications)
|
return c.SubscribeForNewBlocksWithChan(primary, sinceIndex, tillIndex, c.Notifications)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscribeForNewBlocksWithChan registers provided channel as a receiver for the
|
// SubscribeForNewBlocksWithChan registers provided channel as a receiver for the
|
||||||
// specified new blocks notifications. The receiver channel must be properly read and
|
// specified new blocks notifications. The receiver channel must be properly read and
|
||||||
// drained after usage in order not to block other notification receivers.
|
// drained after usage in order not to block other notification receivers.
|
||||||
// See SubscribeForNewBlocks for parameter details.
|
// See SubscribeForNewBlocks for parameter details.
|
||||||
func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, sinceIndex *uint32, rcvrCh chan<- Notification) (string, error) {
|
func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, sinceIndex, tillIndex *uint32, rcvrCh chan<- Notification) (string, error) {
|
||||||
params := []interface{}{"block_added"}
|
params := []interface{}{"block_added"}
|
||||||
var flt *neorpc.BlockFilter
|
var flt *neorpc.BlockFilter
|
||||||
if primary != nil || sinceIndex != nil {
|
if primary != nil || sinceIndex != nil || tillIndex != nil {
|
||||||
flt = &neorpc.BlockFilter{Primary: primary, Since: sinceIndex}
|
flt = &neorpc.BlockFilter{Primary: primary, Since: sinceIndex, Till: tillIndex}
|
||||||
params = append(params, flt)
|
params = append(params, flt)
|
||||||
}
|
}
|
||||||
rcvr := notificationReceiver{
|
rcvr := notificationReceiver{
|
||||||
|
|
|
@ -35,10 +35,10 @@ func TestWSClientSubscription(t *testing.T) {
|
||||||
ch := make(chan Notification)
|
ch := make(chan Notification)
|
||||||
var cases = map[string]func(*WSClient) (string, error){
|
var cases = map[string]func(*WSClient) (string, error){
|
||||||
"blocks": func(wsc *WSClient) (string, error) {
|
"blocks": func(wsc *WSClient) (string, error) {
|
||||||
return wsc.SubscribeForNewBlocks(nil, nil)
|
return wsc.SubscribeForNewBlocks(nil, nil, nil)
|
||||||
},
|
},
|
||||||
"blocks_with_custom_ch": func(wsc *WSClient) (string, error) {
|
"blocks_with_custom_ch": func(wsc *WSClient) (string, error) {
|
||||||
return wsc.SubscribeForNewBlocksWithChan(nil, nil, ch)
|
return wsc.SubscribeForNewBlocksWithChan(nil, nil, nil, ch)
|
||||||
},
|
},
|
||||||
"transactions": func(wsc *WSClient) (string, error) {
|
"transactions": func(wsc *WSClient) (string, error) {
|
||||||
return wsc.SubscribeForNewTransactions(nil, nil)
|
return wsc.SubscribeForNewTransactions(nil, nil)
|
||||||
|
@ -288,7 +288,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
|
||||||
{"blocks primary",
|
{"blocks primary",
|
||||||
func(t *testing.T, wsc *WSClient) {
|
func(t *testing.T, wsc *WSClient) {
|
||||||
primary := 3
|
primary := 3
|
||||||
_, err := wsc.SubscribeForNewBlocks(&primary, nil)
|
_, err := wsc.SubscribeForNewBlocks(&primary, nil, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
},
|
},
|
||||||
func(t *testing.T, p *params.Params) {
|
func(t *testing.T, p *params.Params) {
|
||||||
|
@ -297,12 +297,13 @@ func TestWSFilteredSubscriptions(t *testing.T) {
|
||||||
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
|
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
|
||||||
require.Equal(t, 3, *filt.Primary)
|
require.Equal(t, 3, *filt.Primary)
|
||||||
require.Equal(t, (*uint32)(nil), filt.Since)
|
require.Equal(t, (*uint32)(nil), filt.Since)
|
||||||
|
require.Equal(t, (*uint32)(nil), filt.Till)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{"blocks since",
|
{"blocks since",
|
||||||
func(t *testing.T, wsc *WSClient) {
|
func(t *testing.T, wsc *WSClient) {
|
||||||
var since uint32 = 3
|
var since uint32 = 3
|
||||||
_, err := wsc.SubscribeForNewBlocks(nil, &since)
|
_, err := wsc.SubscribeForNewBlocks(nil, &since, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
},
|
},
|
||||||
func(t *testing.T, p *params.Params) {
|
func(t *testing.T, p *params.Params) {
|
||||||
|
@ -311,15 +312,32 @@ func TestWSFilteredSubscriptions(t *testing.T) {
|
||||||
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
|
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
|
||||||
require.Equal(t, (*int)(nil), filt.Primary)
|
require.Equal(t, (*int)(nil), filt.Primary)
|
||||||
require.Equal(t, uint32(3), *filt.Since)
|
require.Equal(t, uint32(3), *filt.Since)
|
||||||
|
require.Equal(t, (*uint32)(nil), filt.Till)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{"blocks primary and since",
|
{"blocks till",
|
||||||
|
func(t *testing.T, wsc *WSClient) {
|
||||||
|
var till uint32 = 3
|
||||||
|
_, err := wsc.SubscribeForNewBlocks(nil, nil, &till)
|
||||||
|
require.NoError(t, err)
|
||||||
|
},
|
||||||
|
func(t *testing.T, p *params.Params) {
|
||||||
|
param := p.Value(1)
|
||||||
|
filt := new(neorpc.BlockFilter)
|
||||||
|
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
|
||||||
|
require.Equal(t, (*int)(nil), filt.Primary)
|
||||||
|
require.Equal(t, (*uint32)(nil), filt.Since)
|
||||||
|
require.Equal(t, (uint32)(3), *filt.Till)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{"blocks primary, since and till",
|
||||||
func(t *testing.T, wsc *WSClient) {
|
func(t *testing.T, wsc *WSClient) {
|
||||||
var (
|
var (
|
||||||
since uint32 = 3
|
since uint32 = 3
|
||||||
primary = 2
|
primary = 2
|
||||||
|
till uint32 = 5
|
||||||
)
|
)
|
||||||
_, err := wsc.SubscribeForNewBlocks(&primary, &since)
|
_, err := wsc.SubscribeForNewBlocks(&primary, &since, &till)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
},
|
},
|
||||||
func(t *testing.T, p *params.Params) {
|
func(t *testing.T, p *params.Params) {
|
||||||
|
@ -328,6 +346,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
|
||||||
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
|
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
|
||||||
require.Equal(t, 2, *filt.Primary)
|
require.Equal(t, 2, *filt.Primary)
|
||||||
require.Equal(t, uint32(3), *filt.Since)
|
require.Equal(t, uint32(3), *filt.Since)
|
||||||
|
require.Equal(t, uint32(5), *filt.Till)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{"transactions sender",
|
{"transactions sender",
|
||||||
|
|
Loading…
Reference in a new issue