rpc: add new header_of_added_block event subscription

New event is to notify the user about header's content by the moment
when block is stored (which happens after block's processing). This is
needed for proper Waiter work.

Closes #2751.

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
Ekaterina Pavlova 2023-12-22 11:23:41 +03:00
parent 56d32a010e
commit 5d514538cf
9 changed files with 365 additions and 8 deletions

View file

@ -27,11 +27,13 @@ Currently supported events:
Filters use conjunctional logic.
## Ordering and persistence guarantees
* new block is only announced after its processing is complete and the chain
is updated to the new height
* new block and header of this block are only announced after block's processing
is complete and the chain is updated to the new height
* no disk-level persistence guarantees are given
* new in-block transaction is announced after block processing, but before
* header of newly added block is announced after block processing, but before
announcing the block itself
* new in-block transaction is announced after block processing, but before
announcing the block header and the block itself
* transaction notifications are only announced for successful transactions
* all announcements are being done in the same order they happen on the chain.
First, OnPersist script execution is announced followed by notifications generated
@ -40,7 +42,8 @@ Filters use conjunctional logic.
transaction announcement. Transaction announcements are ordered the same way
they're in the block. After all in-block transactions announcements PostPersist
script execution is announced followed by notifications generated during the
script execution. Finally, block announcement is followed.
script execution. Finally, block header is announced followed by the block
announcement itself.
* notary request events announcements are not bound to the chain processing.
Trigger for notary request notifications is notary request mempool content
change, thus, notary request event is announced every time notary request
@ -69,6 +72,12 @@ Recognized stream names:
index starting from which new block notifications will be received and/or
`till` field as an integer values containing block index till which new
block notifications will be received.
* `header_of_added_block`
Filter: `primary` as an integer with primary (speaker) node index from
ConsensusData and/or `since` field as an integer value with header
index starting from which new header notifications will be received and/or
`till` field as an integer values containing header index till which new
header notifications will be received.
* `transaction_added`
Filter: `sender` field containing a string with hex-encoded Uint160 (LE
representation) for transaction's `Sender` and/or `signer` in the same
@ -250,6 +259,47 @@ Example:
}
```
### `header_of_added_block` notification
The first parameter (`params` section) contains a header of added block
converted to a JSON structure, which is similar to a verbose
`getblockheader` response but with the following differences:
* it doesn't have `size` field (you can calculate it client-side)
* it doesn't have `nextblockhash` field (it's supposed to be the latest
one anyway)
* it doesn't have `confirmations` field (see previous)
No other parameters are sent.
Example:
```
{
"jsonrpc": "2.0",
"method": "header_of_added_block",
"params": [
{
"index" : 207,
"time" : 1590006200,
"nextconsensus" : "AXSvJVzydxXuL9da4GVwK25zdesCrVKkHL",
"consensusdata" : {
"primary" : 0,
"nonce" : "0000000000000457"
},
"previousblockhash" : "0x04f7580b111ec75f0ce68d3a9fd70a0544b4521b4a98541694d8575c548b759e",
"witnesses" : [
{
"invocation" : "0c4063429fca5ff75c964d9e38179c75978e33f8174d91a780c2e825265cf2447281594afdd5f3e216dcaf5ff0693aec83f415996cf224454495495f6bd0a4c5d08f0c4099680903a954278580d8533121c2cd3e53a089817b6a784901ec06178a60b5f1da6e70422bdcadc89029767e08d66ce4180b99334cb2d42f42e4216394af15920c4067d5e362189e48839a24e187c59d46f5d9db862c8a029777f1548b19632bfdc73ad373827ed02369f925e89c2303b64e6b9838dca229949b9b9d3bd4c0c3ed8f0c4021d4c00d4522805883f1db929554441bcbbee127c48f6b7feeeb69a72a78c7f0a75011663e239c0820ef903f36168f42936de10f0ef20681cb735a4b53d0390f",
"verification" : "130c2102103a7f7dd016558597f7960d27c516a4394fd968b9e65155eb4b013e4040406e0c2102a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd620c2102b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc20c2103d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee699140b413073b3bb"
}
],
"version" : 0,
"hash" : "0x239fea00c54c2f6812612874183b72bef4473fcdf68bf8da08d74fd5b6cab030",
"merkleroot" : "0xb2c7230ebee4cb83bc03afadbba413e6bca8fcdeaf9c077bea060918da0e52a1"
}
]
}
```
### `transaction_added` notification
The first parameter (`params` section) contains a transaction converted to

View file

@ -1328,6 +1328,7 @@ func (bc *Blockchain) notificationDispatcher() {
// for ease of management (not a lot of subscriptions is really
// expected, but maps are convenient for adding/deleting elements).
blockFeed = make(map[chan *block.Block]bool)
headerFeed = make(map[chan *block.Header]bool)
txFeed = make(map[chan *transaction.Transaction]bool)
notificationFeed = make(map[chan *state.ContainedNotificationEvent]bool)
executionFeed = make(map[chan *state.AppExecResult]bool)
@ -1338,6 +1339,8 @@ func (bc *Blockchain) notificationDispatcher() {
return
case sub := <-bc.subCh:
switch ch := sub.(type) {
case chan *block.Header:
headerFeed[ch] = true
case chan *block.Block:
blockFeed[ch] = true
case chan *transaction.Transaction:
@ -1351,6 +1354,8 @@ func (bc *Blockchain) notificationDispatcher() {
}
case unsub := <-bc.unsubCh:
switch ch := unsub.(type) {
case chan *block.Header:
delete(headerFeed, ch)
case chan *block.Block:
delete(blockFeed, ch)
case chan *transaction.Transaction:
@ -1423,6 +1428,9 @@ func (bc *Blockchain) notificationDispatcher() {
}
}
}
for ch := range headerFeed {
ch <- &event.block.Header
}
for ch := range blockFeed {
ch <- event.block
}
@ -2281,6 +2289,16 @@ func (bc *Blockchain) SubscribeForBlocks(ch chan *block.Block) {
bc.subCh <- ch
}
// SubscribeForHeadersOfAddedBlocks adds given channel to new header event broadcasting, so
// when there is a new block added to the chain you'll receive its header via this
// channel. Make sure it's read from regularly as not reading these events might
// affect other Blockchain functions. Make sure you're not changing the received
// headers, as it may affect the functionality of Blockchain and other
// subscribers.
func (bc *Blockchain) SubscribeForHeadersOfAddedBlocks(ch chan *block.Header) {
bc.subCh <- ch
}
// SubscribeForTransactions adds given channel to new transaction event
// broadcasting, so when there is a new transaction added to the chain (in a
// block) you'll receive it via this channel. Make sure it's read from regularly
@ -2327,6 +2345,21 @@ unsubloop:
}
}
// UnsubscribeFromHeadersOfAddedBlocks unsubscribes given channel from new
// block's header notifications, you can close it afterwards. Passing
// non-subscribed channel is a no-op, but the method can read from this
// channel (discarding any read data).
func (bc *Blockchain) UnsubscribeFromHeadersOfAddedBlocks(ch chan *block.Header) {
unsubloop:
for {
select {
case <-ch:
case bc.unsubCh <- ch:
break unsubloop
}
}
}
// UnsubscribeFromTransactions unsubscribes given channel from new transaction
// notifications, you can close it afterwards. Passing non-subscribed channel is
// a no-op, but the method can read from this channel (discarding any read data).

View file

@ -22,6 +22,8 @@ const (
ExecutionEventID
// NotaryRequestEventID is used for the `notary_request_event` event.
NotaryRequestEventID
// HeaderOfAddedBlockEventID is used for the `header_of_added_block` event.
HeaderOfAddedBlockEventID
// MissedEventID notifies user of missed events.
MissedEventID EventID = 255
)
@ -39,6 +41,8 @@ func (e EventID) String() string {
return "transaction_executed"
case NotaryRequestEventID:
return "notary_request_event"
case HeaderOfAddedBlockEventID:
return "header_of_added_block"
case MissedEventID:
return "event_missed"
default:
@ -59,6 +63,8 @@ func GetEventIDFromString(s string) (EventID, error) {
return ExecutionEventID, nil
case "notary_request_event":
return NotaryRequestEventID, nil
case "header_of_added_block":
return HeaderOfAddedBlockEventID, nil
case "event_missed":
return MissedEventID, nil
default:

View file

@ -34,9 +34,14 @@ func Matches(f Comparator, r Container) bool {
return true
}
switch f.EventID() {
case neorpc.BlockEventID:
case neorpc.BlockEventID, neorpc.HeaderOfAddedBlockEventID:
filt := filter.(neorpc.BlockFilter)
b := r.EventPayload().(*block.Block)
var b *block.Header
if f.EventID() == neorpc.HeaderOfAddedBlockEventID {
b = r.EventPayload().(*block.Header)
} else {
b = &r.EventPayload().(*block.Block).Header
}
primaryOk := filt.Primary == nil || *filt.Primary == int(b.PrimaryIndex)
sinceOk := filt.Since == nil || *filt.Since <= b.Index
tillOk := filt.Till == nil || b.Index <= *filt.Till

View file

@ -61,6 +61,10 @@ func TestMatches(t *testing.T) {
Header: block.Header{PrimaryIndex: byte(primary), Index: index},
},
}
headerContainer := testContainer{
id: neorpc.HeaderOfAddedBlockEventID,
pld: &block.Header{PrimaryIndex: byte(primary), Index: index},
}
st := vmstate.Halt
goodState := st.String()
badState := "FAULT"
@ -149,6 +153,48 @@ func TestMatches(t *testing.T) {
container: bContainer,
expected: true,
},
{
name: "header, no filter",
comparator: testComparator{id: neorpc.HeaderOfAddedBlockEventID},
container: headerContainer,
expected: true,
},
{
name: "header, primary mismatch",
comparator: testComparator{
id: neorpc.HeaderOfAddedBlockEventID,
filter: neorpc.BlockFilter{Primary: &badPrimary},
},
container: headerContainer,
expected: false,
},
{
name: "header, since mismatch",
comparator: testComparator{
id: neorpc.HeaderOfAddedBlockEventID,
filter: neorpc.BlockFilter{Since: &badHigherIndex},
},
container: headerContainer,
expected: false,
},
{
name: "header, till mismatch",
comparator: testComparator{
id: neorpc.HeaderOfAddedBlockEventID,
filter: neorpc.BlockFilter{Till: &badLowerIndex},
},
container: headerContainer,
expected: false,
},
{
name: "header, filter match",
comparator: testComparator{
id: neorpc.HeaderOfAddedBlockEventID,
filter: neorpc.BlockFilter{Primary: &primary, Since: &index, Till: &index},
},
container: headerContainer,
expected: true,
},
{
name: "transaction, no filter",
comparator: testComparator{id: neorpc.TransactionEventID},

View file

@ -162,6 +162,52 @@ func (r *blockReceiver) Close() {
close(r.ch)
}
// headerOfAddedBlockReceiver stores information about header of added block events subscriber.
type headerOfAddedBlockReceiver struct {
filter *neorpc.BlockFilter
ch chan<- *block.Header
}
// EventID implements neorpc.Comparator interface.
func (r *headerOfAddedBlockReceiver) EventID() neorpc.EventID {
return neorpc.HeaderOfAddedBlockEventID
}
// Filter implements neorpc.Comparator interface.
func (r *headerOfAddedBlockReceiver) Filter() any {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *headerOfAddedBlockReceiver) Receiver() any {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *headerOfAddedBlockReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) {
if rpcevent.Matches(r, ntf) {
if nonBlocking {
select {
case r.ch <- ntf.Value.(*block.Header):
default:
return true, true
}
} else {
r.ch <- ntf.Value.(*block.Header)
}
return true, false
}
return false, false
}
// Close implements notificationReceiver interface.
func (r *headerOfAddedBlockReceiver) Close() {
close(r.ch)
}
// txReceiver stores information about transaction events subscriber.
type txReceiver struct {
filter *neorpc.TxFilter
@ -520,6 +566,14 @@ readloop:
ntf.Value = new(state.AppExecResult)
case neorpc.NotaryRequestEventID:
ntf.Value = new(result.NotaryRequestEvent)
case neorpc.HeaderOfAddedBlockEventID:
sr, err := c.stateRootInHeader()
if err != nil {
// Client is not initialized.
connCloseErr = fmt.Errorf("failed to fetch StateRootInHeader: %w", err)
break readloop
}
ntf.Value = &block.New(sr).Header
case neorpc.MissedEventID:
// No value.
default:
@ -747,6 +801,29 @@ func (c *WSClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Blo
return c.performSubscription(params, r)
}
// ReceiveHeadersOfAddedBlocks registers provided channel as a receiver for new
// block's header events. Events can be filtered by the given [neorpc.BlockFilter],
// nil value doesn't add any filter. See WSClient comments for generic
// Receive* behaviour details.
func (c *WSClient) ReceiveHeadersOfAddedBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Header) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
if !c.cache.initDone {
return "", errNetworkNotInitialized
}
params := []any{"header_of_added_block"}
if flt != nil {
flt = flt.Copy()
params = append(params, *flt)
}
r := &headerOfAddedBlockReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// ReceiveTransactions registers provided channel as a receiver for new transaction
// events. Events can be filtered by the given TxFilter, nil value doesn't add any
// filter. See WSClient comments for generic Receive* behaviour details.

View file

@ -379,6 +379,74 @@ func TestWSFilteredSubscriptions(t *testing.T) {
clientCode func(*testing.T, *WSClient)
serverCode func(*testing.T, *params.Params)
}{
{"block header primary",
func(t *testing.T, wsc *WSClient) {
primary := 3
_, err := wsc.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{Primary: &primary}, make(chan *block.Header))
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.BlockFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, 3, *filt.Primary)
require.Equal(t, (*uint32)(nil), filt.Since)
require.Equal(t, (*uint32)(nil), filt.Till)
},
},
{"header since",
func(t *testing.T, wsc *WSClient) {
var since uint32 = 3
_, err := wsc.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{Since: &since}, make(chan *block.Header))
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.BlockFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, (*int)(nil), filt.Primary)
require.Equal(t, uint32(3), *filt.Since)
require.Equal(t, (*uint32)(nil), filt.Till)
},
},
{"header till",
func(t *testing.T, wsc *WSClient) {
var till uint32 = 3
_, err := wsc.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{Till: &till}, make(chan *block.Header))
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.BlockFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, (*int)(nil), filt.Primary)
require.Equal(t, (*uint32)(nil), filt.Since)
require.Equal(t, (uint32)(3), *filt.Till)
},
},
{"header primary, since and till",
func(t *testing.T, wsc *WSClient) {
var (
since uint32 = 3
primary = 2
till uint32 = 5
)
_, err := wsc.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{
Primary: &primary,
Since: &since,
Till: &till,
}, make(chan *block.Header))
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.BlockFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, 2, *filt.Primary)
require.Equal(t, uint32(3), *filt.Since)
require.Equal(t, uint32(5), *filt.Till)
},
},
{"blocks primary",
func(t *testing.T, wsc *WSClient) {
primary := 3

View file

@ -100,10 +100,12 @@ type (
InitVerificationContext(ic *interop.Context, hash util.Uint160, witness *transaction.Witness) error
P2PSigExtensionsEnabled() bool
SubscribeForBlocks(ch chan *block.Block)
SubscribeForHeadersOfAddedBlocks(ch chan *block.Header)
SubscribeForExecutions(ch chan *state.AppExecResult)
SubscribeForNotifications(ch chan *state.ContainedNotificationEvent)
SubscribeForTransactions(ch chan *transaction.Transaction)
UnsubscribeFromBlocks(ch chan *block.Block)
UnsubscribeFromHeadersOfAddedBlocks(ch chan *block.Header)
UnsubscribeFromExecutions(ch chan *state.AppExecResult)
UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent)
UnsubscribeFromTransactions(ch chan *transaction.Transaction)
@ -151,12 +153,14 @@ type (
subsCounterLock sync.RWMutex
blockSubs int
blockHeaderSubs int
executionSubs int
notificationSubs int
transactionSubs int
notaryRequestSubs int
blockCh chan *block.Block
blockHeaderCh chan *block.Header
executionCh chan *state.AppExecResult
notificationCh chan *state.ContainedNotificationEvent
transactionCh chan *transaction.Transaction
@ -361,6 +365,7 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server,
notificationCh: make(chan *state.ContainedNotificationEvent),
transactionCh: make(chan *transaction.Transaction),
notaryRequestCh: make(chan mempoolevent.Event),
blockHeaderCh: make(chan *block.Header),
subEventsToExitCh: make(chan struct{}),
}
}
@ -2730,7 +2735,7 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (any, *neor
jd := json.NewDecoder(bytes.NewReader(param.RawMessage))
jd.DisallowUnknownFields()
switch event {
case neorpc.BlockEventID:
case neorpc.BlockEventID, neorpc.HeaderOfAddedBlockEventID:
flt := new(neorpc.BlockFilter)
err = jd.Decode(flt)
filter = *flt
@ -2817,6 +2822,11 @@ func (s *Server) subscribeToChannel(event neorpc.EventID) {
s.coreServer.SubscribeForNotaryRequests(s.notaryRequestCh)
}
s.notaryRequestSubs++
case neorpc.HeaderOfAddedBlockEventID:
if s.blockHeaderSubs == 0 {
s.chain.SubscribeForHeadersOfAddedBlocks(s.blockHeaderCh)
}
s.blockHeaderSubs++
}
}
@ -2872,6 +2882,11 @@ func (s *Server) unsubscribeFromChannel(event neorpc.EventID) {
if s.notaryRequestSubs == 0 {
s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh)
}
case neorpc.HeaderOfAddedBlockEventID:
s.blockHeaderSubs--
if s.blockHeaderSubs == 0 {
s.chain.UnsubscribeFromHeadersOfAddedBlocks(s.blockHeaderCh)
}
}
}
@ -2921,6 +2936,9 @@ chloop:
Type: e.Type,
NotaryRequest: e.Data.(*payload.P2PNotaryRequest),
}
case header := <-s.blockHeaderCh:
resp.Event = neorpc.HeaderOfAddedBlockEventID
resp.Payload[0] = header
}
s.subsLock.RLock()
subloop:
@ -2973,6 +2991,7 @@ chloop:
s.chain.UnsubscribeFromTransactions(s.transactionCh)
s.chain.UnsubscribeFromNotifications(s.notificationCh)
s.chain.UnsubscribeFromExecutions(s.executionCh)
s.chain.UnsubscribeFromHeadersOfAddedBlocks(s.blockHeaderCh)
if s.chain.P2PSigExtensionsEnabled() {
s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh)
}
@ -2985,6 +3004,7 @@ drainloop:
case <-s.notificationCh:
case <-s.transactionCh:
case <-s.notaryRequestCh:
case <-s.blockHeaderCh:
default:
break drainloop
}
@ -2996,6 +3016,7 @@ drainloop:
close(s.notificationCh)
close(s.executionCh)
close(s.notaryRequestCh)
close(s.blockHeaderCh)
// notify Shutdown routine
close(s.subEventsToExitCh)
}

View file

@ -93,7 +93,7 @@ func callUnsubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, id st
func TestSubscriptions(t *testing.T) {
var subIDs = make([]string, 0)
var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event"}
var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event", "header_of_added_block"}
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
defer chain.Close()
@ -139,6 +139,8 @@ func TestSubscriptions(t *testing.T) {
break
}
}
require.Equal(t, neorpc.HeaderOfAddedBlockEventID, resp.Event)
resp = getNotification(t, respMsgs)
require.Equal(t, neorpc.BlockEventID, resp.Event)
}
@ -278,6 +280,17 @@ func TestFilteredSubscriptions(t *testing.T) {
t.Fatal("unexpected match for faulted execution")
},
},
"header of added block": {
params: `["header_of_added_block", {"primary": 0, "since": 5}]`,
check: func(t *testing.T, resp *neorpc.Notification) {
rmap := resp.Payload[0].(map[string]any)
require.Equal(t, neorpc.HeaderOfAddedBlockEventID, resp.Event)
primary := rmap["primary"].(float64)
require.Equal(t, 0, int(primary))
index := rmap["index"].(float64)
require.Less(t, 4, int(index))
},
},
}
for name, this := range cases {
@ -460,6 +473,44 @@ func TestFilteredBlockSubscriptions(t *testing.T) {
c.Close()
}
func TestHeaderOfAddedBlockSubscriptions(t *testing.T) {
const numBlocks = 10
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
defer chain.Close()
defer rpcSrv.Shutdown()
headerSubID := callSubscribe(t, c, respMsgs, `["header_of_added_block", {"primary":3}]`)
var expectedCnt int
for i := 0; i < numBlocks; i++ {
primary := uint32(i % 4)
if primary == 3 {
expectedCnt++
}
b := testchain.NewBlock(t, chain, 1, primary)
require.NoError(t, chain.AddBlock(b))
}
for i := 0; i < expectedCnt; i++ {
var resp = new(neorpc.Notification)
select {
case body := <-respMsgs:
require.NoError(t, json.Unmarshal(body, resp))
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
require.Equal(t, neorpc.HeaderOfAddedBlockEventID, resp.Event)
rmap := resp.Payload[0].(map[string]any)
primary := rmap["primary"].(float64)
require.Equal(t, 3, int(primary))
}
callUnsubscribe(t, c, respMsgs, headerSubID)
finishedFlag.CompareAndSwap(false, true)
c.Close()
}
func TestMaxSubscriptions(t *testing.T) {
var subIDs = make([]string, 0)
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)