Merge pull request #3252 from nspcc-dev/header_added

rpc: add header_of_added_block event subscription
This commit is contained in:
Roman Khimov 2023-12-22 15:18:24 +03:00 committed by GitHub
commit 9015215228
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 365 additions and 8 deletions

View file

@ -27,11 +27,13 @@ Currently supported events:
Filters use conjunctional logic. Filters use conjunctional logic.
## Ordering and persistence guarantees ## Ordering and persistence guarantees
* new block is only announced after its processing is complete and the chain * new block and header of this block are only announced after block's processing
is updated to the new height is complete and the chain is updated to the new height
* no disk-level persistence guarantees are given * no disk-level persistence guarantees are given
* new in-block transaction is announced after block processing, but before * header of newly added block is announced after block processing, but before
announcing the block itself announcing the block itself
* new in-block transaction is announced after block processing, but before
announcing the block header and the block itself
* transaction notifications are only announced for successful transactions * transaction notifications are only announced for successful transactions
* all announcements are being done in the same order they happen on the chain. * all announcements are being done in the same order they happen on the chain.
First, OnPersist script execution is announced followed by notifications generated First, OnPersist script execution is announced followed by notifications generated
@ -40,7 +42,8 @@ Filters use conjunctional logic.
transaction announcement. Transaction announcements are ordered the same way transaction announcement. Transaction announcements are ordered the same way
they're in the block. After all in-block transactions announcements PostPersist they're in the block. After all in-block transactions announcements PostPersist
script execution is announced followed by notifications generated during the script execution is announced followed by notifications generated during the
script execution. Finally, block announcement is followed. script execution. Finally, block header is announced followed by the block
announcement itself.
* notary request events announcements are not bound to the chain processing. * notary request events announcements are not bound to the chain processing.
Trigger for notary request notifications is notary request mempool content Trigger for notary request notifications is notary request mempool content
change, thus, notary request event is announced every time notary request change, thus, notary request event is announced every time notary request
@ -69,6 +72,12 @@ Recognized stream names:
index starting from which new block notifications will be received and/or index starting from which new block notifications will be received and/or
`till` field as an integer values containing block index till which new `till` field as an integer values containing block index till which new
block notifications will be received. block notifications will be received.
* `header_of_added_block`
Filter: `primary` as an integer with primary (speaker) node index from
ConsensusData and/or `since` field as an integer value with header
index starting from which new header notifications will be received and/or
`till` field as an integer values containing header index till which new
header notifications will be received.
* `transaction_added` * `transaction_added`
Filter: `sender` field containing a string with hex-encoded Uint160 (LE Filter: `sender` field containing a string with hex-encoded Uint160 (LE
representation) for transaction's `Sender` and/or `signer` in the same representation) for transaction's `Sender` and/or `signer` in the same
@ -250,6 +259,47 @@ Example:
} }
``` ```
### `header_of_added_block` notification
The first parameter (`params` section) contains a header of added block
converted to a JSON structure, which is similar to a verbose
`getblockheader` response but with the following differences:
* it doesn't have `size` field (you can calculate it client-side)
* it doesn't have `nextblockhash` field (it's supposed to be the latest
one anyway)
* it doesn't have `confirmations` field (see previous)
No other parameters are sent.
Example:
```
{
"jsonrpc": "2.0",
"method": "header_of_added_block",
"params": [
{
"index" : 207,
"time" : 1590006200,
"nextconsensus" : "AXSvJVzydxXuL9da4GVwK25zdesCrVKkHL",
"consensusdata" : {
"primary" : 0,
"nonce" : "0000000000000457"
},
"previousblockhash" : "0x04f7580b111ec75f0ce68d3a9fd70a0544b4521b4a98541694d8575c548b759e",
"witnesses" : [
{
"invocation" : "0c4063429fca5ff75c964d9e38179c75978e33f8174d91a780c2e825265cf2447281594afdd5f3e216dcaf5ff0693aec83f415996cf224454495495f6bd0a4c5d08f0c4099680903a954278580d8533121c2cd3e53a089817b6a784901ec06178a60b5f1da6e70422bdcadc89029767e08d66ce4180b99334cb2d42f42e4216394af15920c4067d5e362189e48839a24e187c59d46f5d9db862c8a029777f1548b19632bfdc73ad373827ed02369f925e89c2303b64e6b9838dca229949b9b9d3bd4c0c3ed8f0c4021d4c00d4522805883f1db929554441bcbbee127c48f6b7feeeb69a72a78c7f0a75011663e239c0820ef903f36168f42936de10f0ef20681cb735a4b53d0390f",
"verification" : "130c2102103a7f7dd016558597f7960d27c516a4394fd968b9e65155eb4b013e4040406e0c2102a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd620c2102b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc20c2103d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee699140b413073b3bb"
}
],
"version" : 0,
"hash" : "0x239fea00c54c2f6812612874183b72bef4473fcdf68bf8da08d74fd5b6cab030",
"merkleroot" : "0xb2c7230ebee4cb83bc03afadbba413e6bca8fcdeaf9c077bea060918da0e52a1"
}
]
}
```
### `transaction_added` notification ### `transaction_added` notification
The first parameter (`params` section) contains a transaction converted to The first parameter (`params` section) contains a transaction converted to

View file

@ -1328,6 +1328,7 @@ func (bc *Blockchain) notificationDispatcher() {
// for ease of management (not a lot of subscriptions is really // for ease of management (not a lot of subscriptions is really
// expected, but maps are convenient for adding/deleting elements). // expected, but maps are convenient for adding/deleting elements).
blockFeed = make(map[chan *block.Block]bool) blockFeed = make(map[chan *block.Block]bool)
headerFeed = make(map[chan *block.Header]bool)
txFeed = make(map[chan *transaction.Transaction]bool) txFeed = make(map[chan *transaction.Transaction]bool)
notificationFeed = make(map[chan *state.ContainedNotificationEvent]bool) notificationFeed = make(map[chan *state.ContainedNotificationEvent]bool)
executionFeed = make(map[chan *state.AppExecResult]bool) executionFeed = make(map[chan *state.AppExecResult]bool)
@ -1338,6 +1339,8 @@ func (bc *Blockchain) notificationDispatcher() {
return return
case sub := <-bc.subCh: case sub := <-bc.subCh:
switch ch := sub.(type) { switch ch := sub.(type) {
case chan *block.Header:
headerFeed[ch] = true
case chan *block.Block: case chan *block.Block:
blockFeed[ch] = true blockFeed[ch] = true
case chan *transaction.Transaction: case chan *transaction.Transaction:
@ -1351,6 +1354,8 @@ func (bc *Blockchain) notificationDispatcher() {
} }
case unsub := <-bc.unsubCh: case unsub := <-bc.unsubCh:
switch ch := unsub.(type) { switch ch := unsub.(type) {
case chan *block.Header:
delete(headerFeed, ch)
case chan *block.Block: case chan *block.Block:
delete(blockFeed, ch) delete(blockFeed, ch)
case chan *transaction.Transaction: case chan *transaction.Transaction:
@ -1423,6 +1428,9 @@ func (bc *Blockchain) notificationDispatcher() {
} }
} }
} }
for ch := range headerFeed {
ch <- &event.block.Header
}
for ch := range blockFeed { for ch := range blockFeed {
ch <- event.block ch <- event.block
} }
@ -2281,6 +2289,16 @@ func (bc *Blockchain) SubscribeForBlocks(ch chan *block.Block) {
bc.subCh <- ch bc.subCh <- ch
} }
// SubscribeForHeadersOfAddedBlocks adds given channel to new header event broadcasting, so
// when there is a new block added to the chain you'll receive its header via this
// channel. Make sure it's read from regularly as not reading these events might
// affect other Blockchain functions. Make sure you're not changing the received
// headers, as it may affect the functionality of Blockchain and other
// subscribers.
func (bc *Blockchain) SubscribeForHeadersOfAddedBlocks(ch chan *block.Header) {
bc.subCh <- ch
}
// SubscribeForTransactions adds given channel to new transaction event // SubscribeForTransactions adds given channel to new transaction event
// broadcasting, so when there is a new transaction added to the chain (in a // broadcasting, so when there is a new transaction added to the chain (in a
// block) you'll receive it via this channel. Make sure it's read from regularly // block) you'll receive it via this channel. Make sure it's read from regularly
@ -2327,6 +2345,21 @@ unsubloop:
} }
} }
// UnsubscribeFromHeadersOfAddedBlocks unsubscribes given channel from new
// block's header notifications, you can close it afterwards. Passing
// non-subscribed channel is a no-op, but the method can read from this
// channel (discarding any read data).
func (bc *Blockchain) UnsubscribeFromHeadersOfAddedBlocks(ch chan *block.Header) {
unsubloop:
for {
select {
case <-ch:
case bc.unsubCh <- ch:
break unsubloop
}
}
}
// UnsubscribeFromTransactions unsubscribes given channel from new transaction // UnsubscribeFromTransactions unsubscribes given channel from new transaction
// notifications, you can close it afterwards. Passing non-subscribed channel is // notifications, you can close it afterwards. Passing non-subscribed channel is
// a no-op, but the method can read from this channel (discarding any read data). // a no-op, but the method can read from this channel (discarding any read data).

View file

@ -22,6 +22,8 @@ const (
ExecutionEventID ExecutionEventID
// NotaryRequestEventID is used for the `notary_request_event` event. // NotaryRequestEventID is used for the `notary_request_event` event.
NotaryRequestEventID NotaryRequestEventID
// HeaderOfAddedBlockEventID is used for the `header_of_added_block` event.
HeaderOfAddedBlockEventID
// MissedEventID notifies user of missed events. // MissedEventID notifies user of missed events.
MissedEventID EventID = 255 MissedEventID EventID = 255
) )
@ -39,6 +41,8 @@ func (e EventID) String() string {
return "transaction_executed" return "transaction_executed"
case NotaryRequestEventID: case NotaryRequestEventID:
return "notary_request_event" return "notary_request_event"
case HeaderOfAddedBlockEventID:
return "header_of_added_block"
case MissedEventID: case MissedEventID:
return "event_missed" return "event_missed"
default: default:
@ -59,6 +63,8 @@ func GetEventIDFromString(s string) (EventID, error) {
return ExecutionEventID, nil return ExecutionEventID, nil
case "notary_request_event": case "notary_request_event":
return NotaryRequestEventID, nil return NotaryRequestEventID, nil
case "header_of_added_block":
return HeaderOfAddedBlockEventID, nil
case "event_missed": case "event_missed":
return MissedEventID, nil return MissedEventID, nil
default: default:

View file

@ -34,9 +34,14 @@ func Matches(f Comparator, r Container) bool {
return true return true
} }
switch f.EventID() { switch f.EventID() {
case neorpc.BlockEventID: case neorpc.BlockEventID, neorpc.HeaderOfAddedBlockEventID:
filt := filter.(neorpc.BlockFilter) filt := filter.(neorpc.BlockFilter)
b := r.EventPayload().(*block.Block) var b *block.Header
if f.EventID() == neorpc.HeaderOfAddedBlockEventID {
b = r.EventPayload().(*block.Header)
} else {
b = &r.EventPayload().(*block.Block).Header
}
primaryOk := filt.Primary == nil || *filt.Primary == int(b.PrimaryIndex) primaryOk := filt.Primary == nil || *filt.Primary == int(b.PrimaryIndex)
sinceOk := filt.Since == nil || *filt.Since <= b.Index sinceOk := filt.Since == nil || *filt.Since <= b.Index
tillOk := filt.Till == nil || b.Index <= *filt.Till tillOk := filt.Till == nil || b.Index <= *filt.Till

View file

@ -61,6 +61,10 @@ func TestMatches(t *testing.T) {
Header: block.Header{PrimaryIndex: byte(primary), Index: index}, Header: block.Header{PrimaryIndex: byte(primary), Index: index},
}, },
} }
headerContainer := testContainer{
id: neorpc.HeaderOfAddedBlockEventID,
pld: &block.Header{PrimaryIndex: byte(primary), Index: index},
}
st := vmstate.Halt st := vmstate.Halt
goodState := st.String() goodState := st.String()
badState := "FAULT" badState := "FAULT"
@ -149,6 +153,48 @@ func TestMatches(t *testing.T) {
container: bContainer, container: bContainer,
expected: true, expected: true,
}, },
{
name: "header, no filter",
comparator: testComparator{id: neorpc.HeaderOfAddedBlockEventID},
container: headerContainer,
expected: true,
},
{
name: "header, primary mismatch",
comparator: testComparator{
id: neorpc.HeaderOfAddedBlockEventID,
filter: neorpc.BlockFilter{Primary: &badPrimary},
},
container: headerContainer,
expected: false,
},
{
name: "header, since mismatch",
comparator: testComparator{
id: neorpc.HeaderOfAddedBlockEventID,
filter: neorpc.BlockFilter{Since: &badHigherIndex},
},
container: headerContainer,
expected: false,
},
{
name: "header, till mismatch",
comparator: testComparator{
id: neorpc.HeaderOfAddedBlockEventID,
filter: neorpc.BlockFilter{Till: &badLowerIndex},
},
container: headerContainer,
expected: false,
},
{
name: "header, filter match",
comparator: testComparator{
id: neorpc.HeaderOfAddedBlockEventID,
filter: neorpc.BlockFilter{Primary: &primary, Since: &index, Till: &index},
},
container: headerContainer,
expected: true,
},
{ {
name: "transaction, no filter", name: "transaction, no filter",
comparator: testComparator{id: neorpc.TransactionEventID}, comparator: testComparator{id: neorpc.TransactionEventID},

View file

@ -162,6 +162,52 @@ func (r *blockReceiver) Close() {
close(r.ch) close(r.ch)
} }
// headerOfAddedBlockReceiver stores information about header of added block events subscriber.
type headerOfAddedBlockReceiver struct {
filter *neorpc.BlockFilter
ch chan<- *block.Header
}
// EventID implements neorpc.Comparator interface.
func (r *headerOfAddedBlockReceiver) EventID() neorpc.EventID {
return neorpc.HeaderOfAddedBlockEventID
}
// Filter implements neorpc.Comparator interface.
func (r *headerOfAddedBlockReceiver) Filter() any {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *headerOfAddedBlockReceiver) Receiver() any {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *headerOfAddedBlockReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) {
if rpcevent.Matches(r, ntf) {
if nonBlocking {
select {
case r.ch <- ntf.Value.(*block.Header):
default:
return true, true
}
} else {
r.ch <- ntf.Value.(*block.Header)
}
return true, false
}
return false, false
}
// Close implements notificationReceiver interface.
func (r *headerOfAddedBlockReceiver) Close() {
close(r.ch)
}
// txReceiver stores information about transaction events subscriber. // txReceiver stores information about transaction events subscriber.
type txReceiver struct { type txReceiver struct {
filter *neorpc.TxFilter filter *neorpc.TxFilter
@ -520,6 +566,14 @@ readloop:
ntf.Value = new(state.AppExecResult) ntf.Value = new(state.AppExecResult)
case neorpc.NotaryRequestEventID: case neorpc.NotaryRequestEventID:
ntf.Value = new(result.NotaryRequestEvent) ntf.Value = new(result.NotaryRequestEvent)
case neorpc.HeaderOfAddedBlockEventID:
sr, err := c.stateRootInHeader()
if err != nil {
// Client is not initialized.
connCloseErr = fmt.Errorf("failed to fetch StateRootInHeader: %w", err)
break readloop
}
ntf.Value = &block.New(sr).Header
case neorpc.MissedEventID: case neorpc.MissedEventID:
// No value. // No value.
default: default:
@ -747,6 +801,29 @@ func (c *WSClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Blo
return c.performSubscription(params, r) return c.performSubscription(params, r)
} }
// ReceiveHeadersOfAddedBlocks registers provided channel as a receiver for new
// block's header events. Events can be filtered by the given [neorpc.BlockFilter],
// nil value doesn't add any filter. See WSClient comments for generic
// Receive* behaviour details.
func (c *WSClient) ReceiveHeadersOfAddedBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Header) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
if !c.cache.initDone {
return "", errNetworkNotInitialized
}
params := []any{"header_of_added_block"}
if flt != nil {
flt = flt.Copy()
params = append(params, *flt)
}
r := &headerOfAddedBlockReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// ReceiveTransactions registers provided channel as a receiver for new transaction // ReceiveTransactions registers provided channel as a receiver for new transaction
// events. Events can be filtered by the given TxFilter, nil value doesn't add any // events. Events can be filtered by the given TxFilter, nil value doesn't add any
// filter. See WSClient comments for generic Receive* behaviour details. // filter. See WSClient comments for generic Receive* behaviour details.

View file

@ -379,6 +379,74 @@ func TestWSFilteredSubscriptions(t *testing.T) {
clientCode func(*testing.T, *WSClient) clientCode func(*testing.T, *WSClient)
serverCode func(*testing.T, *params.Params) serverCode func(*testing.T, *params.Params)
}{ }{
{"block header primary",
func(t *testing.T, wsc *WSClient) {
primary := 3
_, err := wsc.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{Primary: &primary}, make(chan *block.Header))
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.BlockFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, 3, *filt.Primary)
require.Equal(t, (*uint32)(nil), filt.Since)
require.Equal(t, (*uint32)(nil), filt.Till)
},
},
{"header since",
func(t *testing.T, wsc *WSClient) {
var since uint32 = 3
_, err := wsc.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{Since: &since}, make(chan *block.Header))
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.BlockFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, (*int)(nil), filt.Primary)
require.Equal(t, uint32(3), *filt.Since)
require.Equal(t, (*uint32)(nil), filt.Till)
},
},
{"header till",
func(t *testing.T, wsc *WSClient) {
var till uint32 = 3
_, err := wsc.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{Till: &till}, make(chan *block.Header))
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.BlockFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, (*int)(nil), filt.Primary)
require.Equal(t, (*uint32)(nil), filt.Since)
require.Equal(t, (uint32)(3), *filt.Till)
},
},
{"header primary, since and till",
func(t *testing.T, wsc *WSClient) {
var (
since uint32 = 3
primary = 2
till uint32 = 5
)
_, err := wsc.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{
Primary: &primary,
Since: &since,
Till: &till,
}, make(chan *block.Header))
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.BlockFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, 2, *filt.Primary)
require.Equal(t, uint32(3), *filt.Since)
require.Equal(t, uint32(5), *filt.Till)
},
},
{"blocks primary", {"blocks primary",
func(t *testing.T, wsc *WSClient) { func(t *testing.T, wsc *WSClient) {
primary := 3 primary := 3

View file

@ -100,10 +100,12 @@ type (
InitVerificationContext(ic *interop.Context, hash util.Uint160, witness *transaction.Witness) error InitVerificationContext(ic *interop.Context, hash util.Uint160, witness *transaction.Witness) error
P2PSigExtensionsEnabled() bool P2PSigExtensionsEnabled() bool
SubscribeForBlocks(ch chan *block.Block) SubscribeForBlocks(ch chan *block.Block)
SubscribeForHeadersOfAddedBlocks(ch chan *block.Header)
SubscribeForExecutions(ch chan *state.AppExecResult) SubscribeForExecutions(ch chan *state.AppExecResult)
SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) SubscribeForNotifications(ch chan *state.ContainedNotificationEvent)
SubscribeForTransactions(ch chan *transaction.Transaction) SubscribeForTransactions(ch chan *transaction.Transaction)
UnsubscribeFromBlocks(ch chan *block.Block) UnsubscribeFromBlocks(ch chan *block.Block)
UnsubscribeFromHeadersOfAddedBlocks(ch chan *block.Header)
UnsubscribeFromExecutions(ch chan *state.AppExecResult) UnsubscribeFromExecutions(ch chan *state.AppExecResult)
UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent)
UnsubscribeFromTransactions(ch chan *transaction.Transaction) UnsubscribeFromTransactions(ch chan *transaction.Transaction)
@ -151,12 +153,14 @@ type (
subsCounterLock sync.RWMutex subsCounterLock sync.RWMutex
blockSubs int blockSubs int
blockHeaderSubs int
executionSubs int executionSubs int
notificationSubs int notificationSubs int
transactionSubs int transactionSubs int
notaryRequestSubs int notaryRequestSubs int
blockCh chan *block.Block blockCh chan *block.Block
blockHeaderCh chan *block.Header
executionCh chan *state.AppExecResult executionCh chan *state.AppExecResult
notificationCh chan *state.ContainedNotificationEvent notificationCh chan *state.ContainedNotificationEvent
transactionCh chan *transaction.Transaction transactionCh chan *transaction.Transaction
@ -361,6 +365,7 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server,
notificationCh: make(chan *state.ContainedNotificationEvent), notificationCh: make(chan *state.ContainedNotificationEvent),
transactionCh: make(chan *transaction.Transaction), transactionCh: make(chan *transaction.Transaction),
notaryRequestCh: make(chan mempoolevent.Event), notaryRequestCh: make(chan mempoolevent.Event),
blockHeaderCh: make(chan *block.Header),
subEventsToExitCh: make(chan struct{}), subEventsToExitCh: make(chan struct{}),
} }
} }
@ -2730,7 +2735,7 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (any, *neor
jd := json.NewDecoder(bytes.NewReader(param.RawMessage)) jd := json.NewDecoder(bytes.NewReader(param.RawMessage))
jd.DisallowUnknownFields() jd.DisallowUnknownFields()
switch event { switch event {
case neorpc.BlockEventID: case neorpc.BlockEventID, neorpc.HeaderOfAddedBlockEventID:
flt := new(neorpc.BlockFilter) flt := new(neorpc.BlockFilter)
err = jd.Decode(flt) err = jd.Decode(flt)
filter = *flt filter = *flt
@ -2817,6 +2822,11 @@ func (s *Server) subscribeToChannel(event neorpc.EventID) {
s.coreServer.SubscribeForNotaryRequests(s.notaryRequestCh) s.coreServer.SubscribeForNotaryRequests(s.notaryRequestCh)
} }
s.notaryRequestSubs++ s.notaryRequestSubs++
case neorpc.HeaderOfAddedBlockEventID:
if s.blockHeaderSubs == 0 {
s.chain.SubscribeForHeadersOfAddedBlocks(s.blockHeaderCh)
}
s.blockHeaderSubs++
} }
} }
@ -2872,6 +2882,11 @@ func (s *Server) unsubscribeFromChannel(event neorpc.EventID) {
if s.notaryRequestSubs == 0 { if s.notaryRequestSubs == 0 {
s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh) s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh)
} }
case neorpc.HeaderOfAddedBlockEventID:
s.blockHeaderSubs--
if s.blockHeaderSubs == 0 {
s.chain.UnsubscribeFromHeadersOfAddedBlocks(s.blockHeaderCh)
}
} }
} }
@ -2921,6 +2936,9 @@ chloop:
Type: e.Type, Type: e.Type,
NotaryRequest: e.Data.(*payload.P2PNotaryRequest), NotaryRequest: e.Data.(*payload.P2PNotaryRequest),
} }
case header := <-s.blockHeaderCh:
resp.Event = neorpc.HeaderOfAddedBlockEventID
resp.Payload[0] = header
} }
s.subsLock.RLock() s.subsLock.RLock()
subloop: subloop:
@ -2973,6 +2991,7 @@ chloop:
s.chain.UnsubscribeFromTransactions(s.transactionCh) s.chain.UnsubscribeFromTransactions(s.transactionCh)
s.chain.UnsubscribeFromNotifications(s.notificationCh) s.chain.UnsubscribeFromNotifications(s.notificationCh)
s.chain.UnsubscribeFromExecutions(s.executionCh) s.chain.UnsubscribeFromExecutions(s.executionCh)
s.chain.UnsubscribeFromHeadersOfAddedBlocks(s.blockHeaderCh)
if s.chain.P2PSigExtensionsEnabled() { if s.chain.P2PSigExtensionsEnabled() {
s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh) s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh)
} }
@ -2985,6 +3004,7 @@ drainloop:
case <-s.notificationCh: case <-s.notificationCh:
case <-s.transactionCh: case <-s.transactionCh:
case <-s.notaryRequestCh: case <-s.notaryRequestCh:
case <-s.blockHeaderCh:
default: default:
break drainloop break drainloop
} }
@ -2996,6 +3016,7 @@ drainloop:
close(s.notificationCh) close(s.notificationCh)
close(s.executionCh) close(s.executionCh)
close(s.notaryRequestCh) close(s.notaryRequestCh)
close(s.blockHeaderCh)
// notify Shutdown routine // notify Shutdown routine
close(s.subEventsToExitCh) close(s.subEventsToExitCh)
} }

View file

@ -93,7 +93,7 @@ func callUnsubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, id st
func TestSubscriptions(t *testing.T) { func TestSubscriptions(t *testing.T) {
var subIDs = make([]string, 0) var subIDs = make([]string, 0)
var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event"} var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event", "header_of_added_block"}
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
defer chain.Close() defer chain.Close()
@ -139,6 +139,8 @@ func TestSubscriptions(t *testing.T) {
break break
} }
} }
require.Equal(t, neorpc.HeaderOfAddedBlockEventID, resp.Event)
resp = getNotification(t, respMsgs)
require.Equal(t, neorpc.BlockEventID, resp.Event) require.Equal(t, neorpc.BlockEventID, resp.Event)
} }
@ -278,6 +280,17 @@ func TestFilteredSubscriptions(t *testing.T) {
t.Fatal("unexpected match for faulted execution") t.Fatal("unexpected match for faulted execution")
}, },
}, },
"header of added block": {
params: `["header_of_added_block", {"primary": 0, "since": 5}]`,
check: func(t *testing.T, resp *neorpc.Notification) {
rmap := resp.Payload[0].(map[string]any)
require.Equal(t, neorpc.HeaderOfAddedBlockEventID, resp.Event)
primary := rmap["primary"].(float64)
require.Equal(t, 0, int(primary))
index := rmap["index"].(float64)
require.Less(t, 4, int(index))
},
},
} }
for name, this := range cases { for name, this := range cases {
@ -460,6 +473,44 @@ func TestFilteredBlockSubscriptions(t *testing.T) {
c.Close() c.Close()
} }
func TestHeaderOfAddedBlockSubscriptions(t *testing.T) {
const numBlocks = 10
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
defer chain.Close()
defer rpcSrv.Shutdown()
headerSubID := callSubscribe(t, c, respMsgs, `["header_of_added_block", {"primary":3}]`)
var expectedCnt int
for i := 0; i < numBlocks; i++ {
primary := uint32(i % 4)
if primary == 3 {
expectedCnt++
}
b := testchain.NewBlock(t, chain, 1, primary)
require.NoError(t, chain.AddBlock(b))
}
for i := 0; i < expectedCnt; i++ {
var resp = new(neorpc.Notification)
select {
case body := <-respMsgs:
require.NoError(t, json.Unmarshal(body, resp))
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
require.Equal(t, neorpc.HeaderOfAddedBlockEventID, resp.Event)
rmap := resp.Payload[0].(map[string]any)
primary := rmap["primary"].(float64)
require.Equal(t, 3, int(primary))
}
callUnsubscribe(t, c, respMsgs, headerSubID)
finishedFlag.CompareAndSwap(false, true)
c.Close()
}
func TestMaxSubscriptions(t *testing.T) { func TestMaxSubscriptions(t *testing.T) {
var subIDs = make([]string, 0) var subIDs = make([]string, 0)
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)