diff --git a/docs/notifications.md b/docs/notifications.md index a9f76985e..119130ed1 100644 --- a/docs/notifications.md +++ b/docs/notifications.md @@ -27,11 +27,13 @@ Currently supported events: Filters use conjunctional logic. ## Ordering and persistence guarantees - * new block is only announced after its processing is complete and the chain - is updated to the new height + * new block and header of this block are only announced after block's processing + is complete and the chain is updated to the new height * no disk-level persistence guarantees are given - * new in-block transaction is announced after block processing, but before + * header of newly added block is announced after block processing, but before announcing the block itself + * new in-block transaction is announced after block processing, but before + announcing the block header and the block itself * transaction notifications are only announced for successful transactions * all announcements are being done in the same order they happen on the chain. First, OnPersist script execution is announced followed by notifications generated @@ -40,7 +42,8 @@ Filters use conjunctional logic. transaction announcement. Transaction announcements are ordered the same way they're in the block. After all in-block transactions announcements PostPersist script execution is announced followed by notifications generated during the - script execution. Finally, block announcement is followed. + script execution. Finally, block header is announced followed by the block + announcement itself. * notary request events announcements are not bound to the chain processing. Trigger for notary request notifications is notary request mempool content change, thus, notary request event is announced every time notary request @@ -69,6 +72,12 @@ Recognized stream names: index starting from which new block notifications will be received and/or `till` field as an integer values containing block index till which new block notifications will be received. + * `header_of_added_block` + Filter: `primary` as an integer with primary (speaker) node index from + ConsensusData and/or `since` field as an integer value with header + index starting from which new header notifications will be received and/or + `till` field as an integer values containing header index till which new + header notifications will be received. * `transaction_added` Filter: `sender` field containing a string with hex-encoded Uint160 (LE representation) for transaction's `Sender` and/or `signer` in the same @@ -250,6 +259,47 @@ Example: } ``` +### `header_of_added_block` notification + +The first parameter (`params` section) contains a header of added block +converted to a JSON structure, which is similar to a verbose +`getblockheader` response but with the following differences: + * it doesn't have `size` field (you can calculate it client-side) + * it doesn't have `nextblockhash` field (it's supposed to be the latest + one anyway) + * it doesn't have `confirmations` field (see previous) + +No other parameters are sent. + +Example: +``` +{ + "jsonrpc": "2.0", + "method": "header_of_added_block", + "params": [ + { + "index" : 207, + "time" : 1590006200, + "nextconsensus" : "AXSvJVzydxXuL9da4GVwK25zdesCrVKkHL", + "consensusdata" : { + "primary" : 0, + "nonce" : "0000000000000457" + }, + "previousblockhash" : "0x04f7580b111ec75f0ce68d3a9fd70a0544b4521b4a98541694d8575c548b759e", + "witnesses" : [ + { + "invocation" : "0c4063429fca5ff75c964d9e38179c75978e33f8174d91a780c2e825265cf2447281594afdd5f3e216dcaf5ff0693aec83f415996cf224454495495f6bd0a4c5d08f0c4099680903a954278580d8533121c2cd3e53a089817b6a784901ec06178a60b5f1da6e70422bdcadc89029767e08d66ce4180b99334cb2d42f42e4216394af15920c4067d5e362189e48839a24e187c59d46f5d9db862c8a029777f1548b19632bfdc73ad373827ed02369f925e89c2303b64e6b9838dca229949b9b9d3bd4c0c3ed8f0c4021d4c00d4522805883f1db929554441bcbbee127c48f6b7feeeb69a72a78c7f0a75011663e239c0820ef903f36168f42936de10f0ef20681cb735a4b53d0390f", + "verification" : "130c2102103a7f7dd016558597f7960d27c516a4394fd968b9e65155eb4b013e4040406e0c2102a7bc55fe8684e0119768d104ba30795bdcc86619e864add26156723ed185cd620c2102b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc20c2103d90c07df63e690ce77912e10ab51acc944b66860237b608c4f8f8309e71ee699140b413073b3bb" + } + ], + "version" : 0, + "hash" : "0x239fea00c54c2f6812612874183b72bef4473fcdf68bf8da08d74fd5b6cab030", + "merkleroot" : "0xb2c7230ebee4cb83bc03afadbba413e6bca8fcdeaf9c077bea060918da0e52a1" + } + ] +} +``` + ### `transaction_added` notification The first parameter (`params` section) contains a transaction converted to diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 8cf2251f1..73f95b95c 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -1328,6 +1328,7 @@ func (bc *Blockchain) notificationDispatcher() { // for ease of management (not a lot of subscriptions is really // expected, but maps are convenient for adding/deleting elements). blockFeed = make(map[chan *block.Block]bool) + headerFeed = make(map[chan *block.Header]bool) txFeed = make(map[chan *transaction.Transaction]bool) notificationFeed = make(map[chan *state.ContainedNotificationEvent]bool) executionFeed = make(map[chan *state.AppExecResult]bool) @@ -1338,6 +1339,8 @@ func (bc *Blockchain) notificationDispatcher() { return case sub := <-bc.subCh: switch ch := sub.(type) { + case chan *block.Header: + headerFeed[ch] = true case chan *block.Block: blockFeed[ch] = true case chan *transaction.Transaction: @@ -1351,6 +1354,8 @@ func (bc *Blockchain) notificationDispatcher() { } case unsub := <-bc.unsubCh: switch ch := unsub.(type) { + case chan *block.Header: + delete(headerFeed, ch) case chan *block.Block: delete(blockFeed, ch) case chan *transaction.Transaction: @@ -1423,6 +1428,9 @@ func (bc *Blockchain) notificationDispatcher() { } } } + for ch := range headerFeed { + ch <- &event.block.Header + } for ch := range blockFeed { ch <- event.block } @@ -2281,6 +2289,16 @@ func (bc *Blockchain) SubscribeForBlocks(ch chan *block.Block) { bc.subCh <- ch } +// SubscribeForHeadersOfAddedBlocks adds given channel to new header event broadcasting, so +// when there is a new block added to the chain you'll receive its header via this +// channel. Make sure it's read from regularly as not reading these events might +// affect other Blockchain functions. Make sure you're not changing the received +// headers, as it may affect the functionality of Blockchain and other +// subscribers. +func (bc *Blockchain) SubscribeForHeadersOfAddedBlocks(ch chan *block.Header) { + bc.subCh <- ch +} + // SubscribeForTransactions adds given channel to new transaction event // broadcasting, so when there is a new transaction added to the chain (in a // block) you'll receive it via this channel. Make sure it's read from regularly @@ -2327,6 +2345,21 @@ unsubloop: } } +// UnsubscribeFromHeadersOfAddedBlocks unsubscribes given channel from new +// block's header notifications, you can close it afterwards. Passing +// non-subscribed channel is a no-op, but the method can read from this +// channel (discarding any read data). +func (bc *Blockchain) UnsubscribeFromHeadersOfAddedBlocks(ch chan *block.Header) { +unsubloop: + for { + select { + case <-ch: + case bc.unsubCh <- ch: + break unsubloop + } + } +} + // UnsubscribeFromTransactions unsubscribes given channel from new transaction // notifications, you can close it afterwards. Passing non-subscribed channel is // a no-op, but the method can read from this channel (discarding any read data). diff --git a/pkg/neorpc/events.go b/pkg/neorpc/events.go index ab01628e5..ea0c1657f 100644 --- a/pkg/neorpc/events.go +++ b/pkg/neorpc/events.go @@ -22,6 +22,8 @@ const ( ExecutionEventID // NotaryRequestEventID is used for the `notary_request_event` event. NotaryRequestEventID + // HeaderOfAddedBlockEventID is used for the `header_of_added_block` event. + HeaderOfAddedBlockEventID // MissedEventID notifies user of missed events. MissedEventID EventID = 255 ) @@ -39,6 +41,8 @@ func (e EventID) String() string { return "transaction_executed" case NotaryRequestEventID: return "notary_request_event" + case HeaderOfAddedBlockEventID: + return "header_of_added_block" case MissedEventID: return "event_missed" default: @@ -59,6 +63,8 @@ func GetEventIDFromString(s string) (EventID, error) { return ExecutionEventID, nil case "notary_request_event": return NotaryRequestEventID, nil + case "header_of_added_block": + return HeaderOfAddedBlockEventID, nil case "event_missed": return MissedEventID, nil default: diff --git a/pkg/neorpc/rpcevent/filter.go b/pkg/neorpc/rpcevent/filter.go index 26014050b..f2be233ad 100644 --- a/pkg/neorpc/rpcevent/filter.go +++ b/pkg/neorpc/rpcevent/filter.go @@ -34,9 +34,14 @@ func Matches(f Comparator, r Container) bool { return true } switch f.EventID() { - case neorpc.BlockEventID: + case neorpc.BlockEventID, neorpc.HeaderOfAddedBlockEventID: filt := filter.(neorpc.BlockFilter) - b := r.EventPayload().(*block.Block) + var b *block.Header + if f.EventID() == neorpc.HeaderOfAddedBlockEventID { + b = r.EventPayload().(*block.Header) + } else { + b = &r.EventPayload().(*block.Block).Header + } primaryOk := filt.Primary == nil || *filt.Primary == int(b.PrimaryIndex) sinceOk := filt.Since == nil || *filt.Since <= b.Index tillOk := filt.Till == nil || b.Index <= *filt.Till diff --git a/pkg/neorpc/rpcevent/filter_test.go b/pkg/neorpc/rpcevent/filter_test.go index d32c696a0..6d8d33aad 100644 --- a/pkg/neorpc/rpcevent/filter_test.go +++ b/pkg/neorpc/rpcevent/filter_test.go @@ -61,6 +61,10 @@ func TestMatches(t *testing.T) { Header: block.Header{PrimaryIndex: byte(primary), Index: index}, }, } + headerContainer := testContainer{ + id: neorpc.HeaderOfAddedBlockEventID, + pld: &block.Header{PrimaryIndex: byte(primary), Index: index}, + } st := vmstate.Halt goodState := st.String() badState := "FAULT" @@ -149,6 +153,48 @@ func TestMatches(t *testing.T) { container: bContainer, expected: true, }, + { + name: "header, no filter", + comparator: testComparator{id: neorpc.HeaderOfAddedBlockEventID}, + container: headerContainer, + expected: true, + }, + { + name: "header, primary mismatch", + comparator: testComparator{ + id: neorpc.HeaderOfAddedBlockEventID, + filter: neorpc.BlockFilter{Primary: &badPrimary}, + }, + container: headerContainer, + expected: false, + }, + { + name: "header, since mismatch", + comparator: testComparator{ + id: neorpc.HeaderOfAddedBlockEventID, + filter: neorpc.BlockFilter{Since: &badHigherIndex}, + }, + container: headerContainer, + expected: false, + }, + { + name: "header, till mismatch", + comparator: testComparator{ + id: neorpc.HeaderOfAddedBlockEventID, + filter: neorpc.BlockFilter{Till: &badLowerIndex}, + }, + container: headerContainer, + expected: false, + }, + { + name: "header, filter match", + comparator: testComparator{ + id: neorpc.HeaderOfAddedBlockEventID, + filter: neorpc.BlockFilter{Primary: &primary, Since: &index, Till: &index}, + }, + container: headerContainer, + expected: true, + }, { name: "transaction, no filter", comparator: testComparator{id: neorpc.TransactionEventID}, diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index 50c68c683..962b26455 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -162,6 +162,52 @@ func (r *blockReceiver) Close() { close(r.ch) } +// headerOfAddedBlockReceiver stores information about header of added block events subscriber. +type headerOfAddedBlockReceiver struct { + filter *neorpc.BlockFilter + ch chan<- *block.Header +} + +// EventID implements neorpc.Comparator interface. +func (r *headerOfAddedBlockReceiver) EventID() neorpc.EventID { + return neorpc.HeaderOfAddedBlockEventID +} + +// Filter implements neorpc.Comparator interface. +func (r *headerOfAddedBlockReceiver) Filter() any { + if r.filter == nil { + return nil + } + return *r.filter +} + +// Receiver implements notificationReceiver interface. +func (r *headerOfAddedBlockReceiver) Receiver() any { + return r.ch +} + +// TrySend implements notificationReceiver interface. +func (r *headerOfAddedBlockReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) { + if rpcevent.Matches(r, ntf) { + if nonBlocking { + select { + case r.ch <- ntf.Value.(*block.Header): + default: + return true, true + } + } else { + r.ch <- ntf.Value.(*block.Header) + } + return true, false + } + return false, false +} + +// Close implements notificationReceiver interface. +func (r *headerOfAddedBlockReceiver) Close() { + close(r.ch) +} + // txReceiver stores information about transaction events subscriber. type txReceiver struct { filter *neorpc.TxFilter @@ -520,6 +566,14 @@ readloop: ntf.Value = new(state.AppExecResult) case neorpc.NotaryRequestEventID: ntf.Value = new(result.NotaryRequestEvent) + case neorpc.HeaderOfAddedBlockEventID: + sr, err := c.stateRootInHeader() + if err != nil { + // Client is not initialized. + connCloseErr = fmt.Errorf("failed to fetch StateRootInHeader: %w", err) + break readloop + } + ntf.Value = &block.New(sr).Header case neorpc.MissedEventID: // No value. default: @@ -747,6 +801,29 @@ func (c *WSClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Blo return c.performSubscription(params, r) } +// ReceiveHeadersOfAddedBlocks registers provided channel as a receiver for new +// block's header events. Events can be filtered by the given [neorpc.BlockFilter], +// nil value doesn't add any filter. See WSClient comments for generic +// Receive* behaviour details. +func (c *WSClient) ReceiveHeadersOfAddedBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Header) (string, error) { + if rcvr == nil { + return "", ErrNilNotificationReceiver + } + if !c.cache.initDone { + return "", errNetworkNotInitialized + } + params := []any{"header_of_added_block"} + if flt != nil { + flt = flt.Copy() + params = append(params, *flt) + } + r := &headerOfAddedBlockReceiver{ + filter: flt, + ch: rcvr, + } + return c.performSubscription(params, r) +} + // ReceiveTransactions registers provided channel as a receiver for new transaction // events. Events can be filtered by the given TxFilter, nil value doesn't add any // filter. See WSClient comments for generic Receive* behaviour details. diff --git a/pkg/rpcclient/wsclient_test.go b/pkg/rpcclient/wsclient_test.go index 2555777f8..8247901ac 100644 --- a/pkg/rpcclient/wsclient_test.go +++ b/pkg/rpcclient/wsclient_test.go @@ -379,6 +379,74 @@ func TestWSFilteredSubscriptions(t *testing.T) { clientCode func(*testing.T, *WSClient) serverCode func(*testing.T, *params.Params) }{ + {"block header primary", + func(t *testing.T, wsc *WSClient) { + primary := 3 + _, err := wsc.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{Primary: &primary}, make(chan *block.Header)) + require.NoError(t, err) + }, + func(t *testing.T, p *params.Params) { + param := p.Value(1) + filt := new(neorpc.BlockFilter) + require.NoError(t, json.Unmarshal(param.RawMessage, filt)) + require.Equal(t, 3, *filt.Primary) + require.Equal(t, (*uint32)(nil), filt.Since) + require.Equal(t, (*uint32)(nil), filt.Till) + }, + }, + {"header since", + func(t *testing.T, wsc *WSClient) { + var since uint32 = 3 + _, err := wsc.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{Since: &since}, make(chan *block.Header)) + require.NoError(t, err) + }, + func(t *testing.T, p *params.Params) { + param := p.Value(1) + filt := new(neorpc.BlockFilter) + require.NoError(t, json.Unmarshal(param.RawMessage, filt)) + require.Equal(t, (*int)(nil), filt.Primary) + require.Equal(t, uint32(3), *filt.Since) + require.Equal(t, (*uint32)(nil), filt.Till) + }, + }, + {"header till", + func(t *testing.T, wsc *WSClient) { + var till uint32 = 3 + _, err := wsc.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{Till: &till}, make(chan *block.Header)) + require.NoError(t, err) + }, + func(t *testing.T, p *params.Params) { + param := p.Value(1) + filt := new(neorpc.BlockFilter) + require.NoError(t, json.Unmarshal(param.RawMessage, filt)) + require.Equal(t, (*int)(nil), filt.Primary) + require.Equal(t, (*uint32)(nil), filt.Since) + require.Equal(t, (uint32)(3), *filt.Till) + }, + }, + {"header primary, since and till", + func(t *testing.T, wsc *WSClient) { + var ( + since uint32 = 3 + primary = 2 + till uint32 = 5 + ) + _, err := wsc.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{ + Primary: &primary, + Since: &since, + Till: &till, + }, make(chan *block.Header)) + require.NoError(t, err) + }, + func(t *testing.T, p *params.Params) { + param := p.Value(1) + filt := new(neorpc.BlockFilter) + require.NoError(t, json.Unmarshal(param.RawMessage, filt)) + require.Equal(t, 2, *filt.Primary) + require.Equal(t, uint32(3), *filt.Since) + require.Equal(t, uint32(5), *filt.Till) + }, + }, {"blocks primary", func(t *testing.T, wsc *WSClient) { primary := 3 diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index d79576bee..bb30f566d 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -100,10 +100,12 @@ type ( InitVerificationContext(ic *interop.Context, hash util.Uint160, witness *transaction.Witness) error P2PSigExtensionsEnabled() bool SubscribeForBlocks(ch chan *block.Block) + SubscribeForHeadersOfAddedBlocks(ch chan *block.Header) SubscribeForExecutions(ch chan *state.AppExecResult) SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) SubscribeForTransactions(ch chan *transaction.Transaction) UnsubscribeFromBlocks(ch chan *block.Block) + UnsubscribeFromHeadersOfAddedBlocks(ch chan *block.Header) UnsubscribeFromExecutions(ch chan *state.AppExecResult) UnsubscribeFromNotifications(ch chan *state.ContainedNotificationEvent) UnsubscribeFromTransactions(ch chan *transaction.Transaction) @@ -151,12 +153,14 @@ type ( subsCounterLock sync.RWMutex blockSubs int + blockHeaderSubs int executionSubs int notificationSubs int transactionSubs int notaryRequestSubs int blockCh chan *block.Block + blockHeaderCh chan *block.Header executionCh chan *state.AppExecResult notificationCh chan *state.ContainedNotificationEvent transactionCh chan *transaction.Transaction @@ -361,6 +365,7 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server, notificationCh: make(chan *state.ContainedNotificationEvent), transactionCh: make(chan *transaction.Transaction), notaryRequestCh: make(chan mempoolevent.Event), + blockHeaderCh: make(chan *block.Header), subEventsToExitCh: make(chan struct{}), } } @@ -2730,7 +2735,7 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (any, *neor jd := json.NewDecoder(bytes.NewReader(param.RawMessage)) jd.DisallowUnknownFields() switch event { - case neorpc.BlockEventID: + case neorpc.BlockEventID, neorpc.HeaderOfAddedBlockEventID: flt := new(neorpc.BlockFilter) err = jd.Decode(flt) filter = *flt @@ -2817,6 +2822,11 @@ func (s *Server) subscribeToChannel(event neorpc.EventID) { s.coreServer.SubscribeForNotaryRequests(s.notaryRequestCh) } s.notaryRequestSubs++ + case neorpc.HeaderOfAddedBlockEventID: + if s.blockHeaderSubs == 0 { + s.chain.SubscribeForHeadersOfAddedBlocks(s.blockHeaderCh) + } + s.blockHeaderSubs++ } } @@ -2872,6 +2882,11 @@ func (s *Server) unsubscribeFromChannel(event neorpc.EventID) { if s.notaryRequestSubs == 0 { s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh) } + case neorpc.HeaderOfAddedBlockEventID: + s.blockHeaderSubs-- + if s.blockHeaderSubs == 0 { + s.chain.UnsubscribeFromHeadersOfAddedBlocks(s.blockHeaderCh) + } } } @@ -2921,6 +2936,9 @@ chloop: Type: e.Type, NotaryRequest: e.Data.(*payload.P2PNotaryRequest), } + case header := <-s.blockHeaderCh: + resp.Event = neorpc.HeaderOfAddedBlockEventID + resp.Payload[0] = header } s.subsLock.RLock() subloop: @@ -2973,6 +2991,7 @@ chloop: s.chain.UnsubscribeFromTransactions(s.transactionCh) s.chain.UnsubscribeFromNotifications(s.notificationCh) s.chain.UnsubscribeFromExecutions(s.executionCh) + s.chain.UnsubscribeFromHeadersOfAddedBlocks(s.blockHeaderCh) if s.chain.P2PSigExtensionsEnabled() { s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh) } @@ -2985,6 +3004,7 @@ drainloop: case <-s.notificationCh: case <-s.transactionCh: case <-s.notaryRequestCh: + case <-s.blockHeaderCh: default: break drainloop } @@ -2996,6 +3016,7 @@ drainloop: close(s.notificationCh) close(s.executionCh) close(s.notaryRequestCh) + close(s.blockHeaderCh) // notify Shutdown routine close(s.subEventsToExitCh) } diff --git a/pkg/services/rpcsrv/subscription_test.go b/pkg/services/rpcsrv/subscription_test.go index 6df8e66df..c3d83eda2 100644 --- a/pkg/services/rpcsrv/subscription_test.go +++ b/pkg/services/rpcsrv/subscription_test.go @@ -93,7 +93,7 @@ func callUnsubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, id st func TestSubscriptions(t *testing.T) { var subIDs = make([]string, 0) - var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event"} + var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event", "header_of_added_block"} chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) defer chain.Close() @@ -139,6 +139,8 @@ func TestSubscriptions(t *testing.T) { break } } + require.Equal(t, neorpc.HeaderOfAddedBlockEventID, resp.Event) + resp = getNotification(t, respMsgs) require.Equal(t, neorpc.BlockEventID, resp.Event) } @@ -278,6 +280,17 @@ func TestFilteredSubscriptions(t *testing.T) { t.Fatal("unexpected match for faulted execution") }, }, + "header of added block": { + params: `["header_of_added_block", {"primary": 0, "since": 5}]`, + check: func(t *testing.T, resp *neorpc.Notification) { + rmap := resp.Payload[0].(map[string]any) + require.Equal(t, neorpc.HeaderOfAddedBlockEventID, resp.Event) + primary := rmap["primary"].(float64) + require.Equal(t, 0, int(primary)) + index := rmap["index"].(float64) + require.Less(t, 4, int(index)) + }, + }, } for name, this := range cases { @@ -460,6 +473,44 @@ func TestFilteredBlockSubscriptions(t *testing.T) { c.Close() } +func TestHeaderOfAddedBlockSubscriptions(t *testing.T) { + const numBlocks = 10 + chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + + defer chain.Close() + defer rpcSrv.Shutdown() + + headerSubID := callSubscribe(t, c, respMsgs, `["header_of_added_block", {"primary":3}]`) + + var expectedCnt int + for i := 0; i < numBlocks; i++ { + primary := uint32(i % 4) + if primary == 3 { + expectedCnt++ + } + b := testchain.NewBlock(t, chain, 1, primary) + require.NoError(t, chain.AddBlock(b)) + } + + for i := 0; i < expectedCnt; i++ { + var resp = new(neorpc.Notification) + select { + case body := <-respMsgs: + require.NoError(t, json.Unmarshal(body, resp)) + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + } + + require.Equal(t, neorpc.HeaderOfAddedBlockEventID, resp.Event) + rmap := resp.Payload[0].(map[string]any) + primary := rmap["primary"].(float64) + require.Equal(t, 3, int(primary)) + } + callUnsubscribe(t, c, respMsgs, headerSubID) + finishedFlag.CompareAndSwap(false, true) + c.Close() +} + func TestMaxSubscriptions(t *testing.T) { var subIDs = make([]string, 0) chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)