Merge pull request #2764 from nspcc-dev/subs-refactoring

rpc: refactor WSClient subscriptions API
This commit is contained in:
Roman Khimov 2022-10-26 19:48:45 +07:00 committed by GitHub
commit b95c135856
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 748 additions and 387 deletions

View file

@ -17,7 +17,7 @@ Currently supported events:
* notification generated during execution
Contents: container hash, contract hash, notification name, stack item. Filters: contract hash, notification name.
* transaction executed
* transaction/persisting script executed
Contents: application execution result. Filters: VM state, script container hash.
* new/removed P2P notary request (if `P2PSigExtensions` are enabled)
@ -34,9 +34,13 @@ Filters use conjunctional logic.
announcing the block itself
* transaction notifications are only announced for successful transactions
* all announcements are being done in the same order they happen on the chain
First, transaction execution is announced. It is then followed by notifications
generated during this execution. Next, follows the transaction announcement.
Transaction announcements are ordered the same way they're in the block.
First, OnPersist script execution is announced followed by notifications generated
during the script execution. After that transaction execution is announced. It is
then followed by notifications generated during this execution. Next, follows the
transaction announcement. Transaction announcements are ordered the same way
they're in the block. After all in-block transactions announcements PostPersist
script execution is announced followed by notifications generated during the
script execution. Finally, block announcement is followed.
* unsubscription may not cancel pending, but not yet sent events
## Subscription management
@ -72,7 +76,7 @@ Recognized stream names:
* `transaction_executed`
Filter: `state` field containing `HALT` or `FAULT` string for successful
and failed executions respectively and/or `container` field containing
script container hash.
script container (block/transaction) hash.
* `notary_request_event`
Filter: `sender` field containing a string with hex-encoded Uint160 (LE
representation) for notary request's `Sender` and/or `signer` in the same
@ -343,8 +347,10 @@ Example:
### `transaction_executed` notification
It contains the same result as from `getapplicationlog` method in the first
parameter and no other parameters. The only difference from `getapplicationlog` is
that it always contains zero in the `contract` field.
parameter and no other parameters. The difference from `getapplicationlog` is
that it has block's or transaction's hex-encoded LE Uint256 hash in the `container`
field instead of two separate `txid` and `blockhash` fields and a single execution
instead of an executions array.
Example:
```
@ -353,61 +359,31 @@ Example:
"params" : [
{
"container" : "0xe1cd5e57e721d2a2e05fb1f08721b12057b25ab1dd7fd0f33ee1639932fdfad7",
"executions" : [
"trigger" : "Application",
"gasconsumed" : "2.291",
"stack" : [],
"notifications" : [
{
"trigger" : "Application",
"gasconsumed" : "2.291",
"contract" : "0x0000000000000000000000000000000000000000",
"stack" : [],
"notifications" : [
{
"state" : {
"type" : "Array",
"value" : [
{
"value" : "636f6e74726163742063616c6c",
"type" : "ByteString"
},
{
"type" : "ByteString",
"value" : "7472616e73666572"
},
{
"value" : [
{
"value" : "769162241eedf97c2481652adf1ba0f5bf57431b",
"type" : "ByteString"
},
{
"type" : "ByteString",
"value" : "316e851039019d39dfc2c37d6c3fee19fd580987"
},
{
"value" : "1000",
"type" : "Integer"
}
],
"type" : "Array"
}
]
"state" : {
"type" : "Array",
"value" : [
{
"value" : "636f6e74726163742063616c6c",
"type" : "ByteString"
},
"contract" : "0x1b4357bff5a01bdf2a6581247cf9ed1e24629176"
},
{
"contract" : "0x1b4357bff5a01bdf2a6581247cf9ed1e24629176",
"state" : {
{
"type" : "ByteString",
"value" : "7472616e73666572"
},
{
"value" : [
{
"value" : "7472616e73666572",
"type" : "ByteString"
},
{
"value" : "769162241eedf97c2481652adf1ba0f5bf57431b",
"type" : "ByteString"
},
{
"value" : "316e851039019d39dfc2c37d6c3fee19fd580987",
"type" : "ByteString"
"type" : "ByteString",
"value" : "316e851039019d39dfc2c37d6c3fee19fd580987"
},
{
"value" : "1000",
@ -416,11 +392,36 @@ Example:
],
"type" : "Array"
}
}
],
"vmstate" : "HALT"
]
},
"contract" : "0x1b4357bff5a01bdf2a6581247cf9ed1e24629176"
},
{
"contract" : "0x1b4357bff5a01bdf2a6581247cf9ed1e24629176",
"state" : {
"value" : [
{
"value" : "7472616e73666572",
"type" : "ByteString"
},
{
"value" : "769162241eedf97c2481652adf1ba0f5bf57431b",
"type" : "ByteString"
},
{
"value" : "316e851039019d39dfc2c37d6c3fee19fd580987",
"type" : "ByteString"
},
{
"value" : "1000",
"type" : "Integer"
}
],
"type" : "Array"
}
}
]
],
"vmstate" : "HALT"
}
],
"jsonrpc" : "2.0"

View file

@ -72,29 +72,34 @@ type (
}
// BlockFilter is a wrapper structure for the block event filter. It allows
// to filter blocks by primary index and by block index (allowing blocks since
// the specified index).
// to filter blocks by primary index and/or by block index (allowing blocks
// since/till the specified index inclusively). nil value treated as missing
// filter.
BlockFilter struct {
Primary *int `json:"primary,omitempty"`
Since *uint32 `json:"since,omitempty"`
Till *uint32 `json:"till,omitempty"`
}
// TxFilter is a wrapper structure for the transaction event filter. It
// allows to filter transactions by senders and signers.
// allows to filter transactions by senders and/or signers. nil value treated
// as missing filter.
TxFilter struct {
Sender *util.Uint160 `json:"sender,omitempty"`
Signer *util.Uint160 `json:"signer,omitempty"`
}
// NotificationFilter is a wrapper structure representing a filter used for
// notifications generated during transaction execution. Notifications can
// be filtered by contract hash and by name.
// be filtered by contract hash and/or by name. nil value treated as missing
// filter.
NotificationFilter struct {
Contract *util.Uint160 `json:"contract,omitempty"`
Name *string `json:"name,omitempty"`
}
// ExecutionFilter is a wrapper structure used for transaction execution
// events. It allows to choose failing or successful transactions based
// on their VM state.
// ExecutionFilter is a wrapper structure used for transaction and persisting
// scripts execution events. It allows to choose failing or successful
// transactions and persisting scripts based on their VM state and/or to
// choose execution event with the specified container. nil value treated as
// missing filter.
ExecutionFilter struct {
State *string `json:"state,omitempty"`
Container *util.Uint256 `json:"container,omitempty"`

View file

@ -6,10 +6,10 @@ import (
"fmt"
"time"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util"
)
@ -30,6 +30,9 @@ var (
// ErrAwaitingNotSupported is returned from Wait method if Waiter instance
// doesn't support transaction awaiting.
ErrAwaitingNotSupported = errors.New("awaiting not supported")
// ErrMissedEvent is returned when RPCEventWaiter closes receiver channel
// which happens if missed event was received from the RPC server.
ErrMissedEvent = errors.New("some event was missed")
)
type (
@ -65,8 +68,8 @@ type (
RPCEventWaiter interface {
RPCPollingWaiter
SubscribeForNewBlocksWithChan(primary *int, since *uint32, till *uint32, rcvrCh chan<- rpcclient.Notification) (string, error)
SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- rpcclient.Notification) (string, error)
ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error)
ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error)
Unsubscribe(id string) error
}
)
@ -223,22 +226,27 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui
}
}
}()
rcvr := make(chan rpcclient.Notification)
bRcvr := make(chan *block.Block)
aerRcvr := make(chan *state.AppExecResult)
defer func() {
drainLoop:
// Drain rcvr to avoid other notification receivers blocking.
// Drain receivers to avoid other notification receivers blocking.
for {
select {
case <-rcvr:
case <-bRcvr:
case <-aerRcvr:
default:
break drainLoop
}
}
close(rcvr)
if wsWaitErr == nil || !errors.Is(wsWaitErr, ErrMissedEvent) {
close(bRcvr)
close(aerRcvr)
}
}()
// Execution event follows the block event, thus wait until the block next to the VUB to be sure.
since := vub + 1
blocksID, err := w.ws.SubscribeForNewBlocksWithChan(nil, &since, nil, rcvr)
// Execution event precedes the block event, thus wait until the VUB-th block to be sure.
since := vub
blocksID, err := w.ws.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bRcvr)
if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err)
return
@ -256,7 +264,7 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui
}
}()
for _, h := range hashes {
txsID, err := w.ws.SubscribeForTransactionExecutionsWithChan(nil, &h, rcvr)
txsID, err := w.ws.ReceiveExecutions(&neorpc.ExecutionFilter{Container: &h}, aerRcvr)
if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err)
return
@ -275,27 +283,25 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui
}()
}
for {
select {
case ntf := <-rcvr:
switch ntf.Type {
case neorpc.BlockEventID:
waitErr = ErrTxNotAccepted
return
case neorpc.ExecutionEventID:
res = ntf.Value.(*state.AppExecResult)
return
case neorpc.MissedEventID:
// We're toast, retry with non-ws client.
wsWaitErr = errors.New("some event was missed")
return
}
case <-w.ws.Context().Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err())
return
case <-ctx.Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, ctx.Err())
select {
case _, ok := <-bRcvr:
if !ok {
// We're toast, retry with non-ws client.
wsWaitErr = ErrMissedEvent
return
}
waitErr = ErrTxNotAccepted
case aer, ok := <-aerRcvr:
if !ok {
// We're toast, retry with non-ws client.
wsWaitErr = ErrMissedEvent
return
}
res = aer
case <-w.ws.Context().Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err())
case <-ctx.Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, ctx.Err())
}
return
}

View file

@ -11,7 +11,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/stretchr/testify/require"
)
@ -20,20 +19,20 @@ type AwaitableRPCClient struct {
RPCClient
chLock sync.RWMutex
subBlockCh chan<- rpcclient.Notification
subTxCh chan<- rpcclient.Notification
subBlockCh chan<- *block.Block
subTxCh chan<- *state.AppExecResult
}
func (c *AwaitableRPCClient) SubscribeForNewBlocksWithChan(primary *int, since *uint32, till *uint32, rcvrCh chan<- rpcclient.Notification) (string, error) {
func (c *AwaitableRPCClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) {
c.chLock.Lock()
defer c.chLock.Unlock()
c.subBlockCh = rcvrCh
c.subBlockCh = rcvr
return "1", nil
}
func (c *AwaitableRPCClient) SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- rpcclient.Notification) (string, error) {
func (c *AwaitableRPCClient) ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error) {
c.chLock.Lock()
defer c.chLock.Unlock()
c.subTxCh = rcvrCh
c.subTxCh = rcvr
return "2", nil
}
func (c *AwaitableRPCClient) Unsubscribe(id string) error { return nil }
@ -163,10 +162,7 @@ func TestWSWaiter_Wait(t *testing.T) {
check(t, func() {
c.chLock.RLock()
defer c.chLock.RUnlock()
c.subBlockCh <- rpcclient.Notification{
Type: neorpc.ExecutionEventID,
Value: expected,
}
c.subTxCh <- expected
})
// Missing AER after VUB.
@ -178,9 +174,6 @@ func TestWSWaiter_Wait(t *testing.T) {
check(t, func() {
c.chLock.RLock()
defer c.chLock.RUnlock()
c.subBlockCh <- rpcclient.Notification{
Type: neorpc.BlockEventID,
Value: &block.Block{},
}
c.subBlockCh <- &block.Block{}
})
}

View file

@ -34,6 +34,11 @@ type WSClient struct {
// WSClient to block even regular requests. This channel is not buffered.
// In case of protocol error or upon connection closure, this channel will
// be closed, so make sure to handle this.
//
// Deprecated: please, use custom channels with ReceiveBlocks, ReceiveTransactions,
// ReceiveExecutionNotifications, ReceiveExecutions, ReceiveNotaryRequests
// methods to subscribe for notifications. This field will be removed in future
// versions.
Notifications chan Notification
ws *websocket.Conn
@ -47,29 +52,261 @@ type WSClient struct {
subscriptionsLock sync.RWMutex
subscriptions map[string]notificationReceiver
// receivers is a mapping from receiver channel to a set of corresponding subscription IDs.
// It must be accessed with subscriptionsLock taken. Its keys must be used to deliver
// notifications, if channel is not in the receivers list and corresponding subscription
// still exists, notification must not be sent.
receivers map[interface{}][]string
respLock sync.RWMutex
respChannels map[uint64]chan *neorpc.Response
}
// notificationReceiver is a server events receiver. It stores desired notifications ID
// and filter and a channel used to receive matching notifications.
type notificationReceiver struct {
typ neorpc.EventID
filter interface{}
ch chan<- Notification
// notificationReceiver is an interface aimed to provide WS subscriber functionality
// for different types of subscriptions.
type notificationReceiver interface {
// Comparator provides notification filtering functionality.
rpcevent.Comparator
// Receiver returns notification receiver channel.
Receiver() interface{}
// TrySend checks whether notification passes receiver filter and sends it
// to the underlying channel if so.
TrySend(ntf Notification) bool
// Close closes underlying receiver channel.
Close()
}
// EventID implements neorpc.Comparator interface and returns notification ID.
func (r notificationReceiver) EventID() neorpc.EventID {
return r.typ
// blockReceiver stores information about block events subscriber.
type blockReceiver struct {
filter *neorpc.BlockFilter
ch chan<- *block.Block
}
// Filter implements neorpc.Comparator interface and returns notification filter.
func (r notificationReceiver) Filter() interface{} {
// EventID implements neorpc.Comparator interface.
func (r *blockReceiver) EventID() neorpc.EventID {
return neorpc.BlockEventID
}
// Filter implements neorpc.Comparator interface.
func (r *blockReceiver) Filter() interface{} {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *blockReceiver) Receiver() interface{} {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *blockReceiver) TrySend(ntf Notification) bool {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*block.Block)
return true
}
return false
}
// Close implements notificationReceiver interface.
func (r *blockReceiver) Close() {
close(r.ch)
}
// txReceiver stores information about transaction events subscriber.
type txReceiver struct {
filter *neorpc.TxFilter
ch chan<- *transaction.Transaction
}
// EventID implements neorpc.Comparator interface.
func (r *txReceiver) EventID() neorpc.EventID {
return neorpc.TransactionEventID
}
// Filter implements neorpc.Comparator interface.
func (r *txReceiver) Filter() interface{} {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *txReceiver) Receiver() interface{} {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *txReceiver) TrySend(ntf Notification) bool {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*transaction.Transaction)
return true
}
return false
}
// Close implements notificationReceiver interface.
func (r *txReceiver) Close() {
close(r.ch)
}
// executionNotificationReceiver stores information about execution notifications subscriber.
type executionNotificationReceiver struct {
filter *neorpc.NotificationFilter
ch chan<- *state.ContainedNotificationEvent
}
// EventID implements neorpc.Comparator interface.
func (r *executionNotificationReceiver) EventID() neorpc.EventID {
return neorpc.NotificationEventID
}
// Filter implements neorpc.Comparator interface.
func (r *executionNotificationReceiver) Filter() interface{} {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *executionNotificationReceiver) Receiver() interface{} {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *executionNotificationReceiver) TrySend(ntf Notification) bool {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*state.ContainedNotificationEvent)
return true
}
return false
}
// Close implements notificationReceiver interface.
func (r *executionNotificationReceiver) Close() {
close(r.ch)
}
// executionReceiver stores information about application execution results subscriber.
type executionReceiver struct {
filter *neorpc.ExecutionFilter
ch chan<- *state.AppExecResult
}
// EventID implements neorpc.Comparator interface.
func (r *executionReceiver) EventID() neorpc.EventID {
return neorpc.ExecutionEventID
}
// Filter implements neorpc.Comparator interface.
func (r *executionReceiver) Filter() interface{} {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *executionReceiver) Receiver() interface{} {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *executionReceiver) TrySend(ntf Notification) bool {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*state.AppExecResult)
return true
}
return false
}
// Close implements notificationReceiver interface.
func (r *executionReceiver) Close() {
close(r.ch)
}
// notaryRequestReceiver stores information about notary requests subscriber.
type notaryRequestReceiver struct {
filter *neorpc.TxFilter
ch chan<- *result.NotaryRequestEvent
}
// EventID implements neorpc.Comparator interface.
func (r *notaryRequestReceiver) EventID() neorpc.EventID {
return neorpc.NotaryRequestEventID
}
// Filter implements neorpc.Comparator interface.
func (r *notaryRequestReceiver) Filter() interface{} {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *notaryRequestReceiver) Receiver() interface{} {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *notaryRequestReceiver) TrySend(ntf Notification) bool {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*result.NotaryRequestEvent)
return true
}
return false
}
// Close implements notificationReceiver interface.
func (r *notaryRequestReceiver) Close() {
close(r.ch)
}
// naiveReceiver is a structure leaved for deprecated single channel based notifications
// delivering.
//
// Deprecated: this receiver must be removed after outdated subscriptions API removal.
type naiveReceiver struct {
eventID neorpc.EventID
filter interface{}
ch chan<- Notification
}
// EventID implements neorpc.Comparator interface.
func (r *naiveReceiver) EventID() neorpc.EventID {
return r.eventID
}
// Filter implements neorpc.Comparator interface.
func (r *naiveReceiver) Filter() interface{} {
return r.filter
}
// Receiver implements notificationReceiver interface.
func (r *naiveReceiver) Receiver() interface{} {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *naiveReceiver) TrySend(ntf Notification) bool {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf
return true
}
return false
}
// Close implements notificationReceiver interface.
func (r *naiveReceiver) Close() {
r.ch <- Notification{
Type: neorpc.MissedEventID, // backwards-compatible behaviour
}
}
// Notification represents a server-generated notification for client subscriptions.
// Value can be one of *block.Block, *state.AppExecResult, *state.ContainedNotificationEvent
// *transaction.Transaction or *subscriptions.NotaryRequestEvent based on Type.
@ -111,6 +348,9 @@ const (
wsWriteLimit = wsPingPeriod / 2
)
// ErrNilNotificationReceiver is returned when notification receiver channel is nil.
var ErrNilNotificationReceiver = errors.New("nil notification receiver")
// errConnClosedByUser is a WSClient error used iff the user calls (*WSClient).Close method by himself.
var errConnClosedByUser = errors.New("connection closed by user")
@ -138,6 +378,7 @@ func NewWS(ctx context.Context, endpoint string, opts Options) (*WSClient, error
respChannels: make(map[uint64]chan *neorpc.Response),
requests: make(chan *neorpc.Request),
subscriptions: make(map[string]notificationReceiver),
receivers: make(map[interface{}][]string),
}
err = initClient(ctx, &wsc.Client, endpoint, opts)
@ -237,13 +478,22 @@ readloop:
break readloop
}
}
ok := make(map[chan<- Notification]bool)
if event == neorpc.MissedEventID {
c.subscriptionsLock.Lock()
for rcvr, ids := range c.receivers {
c.subscriptions[ids[0]].Close()
delete(c.receivers, rcvr)
}
c.subscriptionsLock.Unlock()
continue readloop
}
c.subscriptionsLock.RLock()
for _, rcvr := range c.subscriptions {
ntf := Notification{Type: event, Value: val}
if (rpcevent.Matches(rcvr, ntf) || event == neorpc.MissedEventID /*missed event must be delivered to each receiver*/) && !ok[rcvr.ch] {
ok[rcvr.ch] = true // strictly one notification per channel
rcvr.ch <- ntf // this will block other receivers
ntf := Notification{Type: event, Value: val}
for _, ids := range c.receivers {
for _, id := range ids {
if c.subscriptions[id].TrySend(ntf) {
break // strictly one notification per channel
}
}
}
c.subscriptionsLock.RUnlock()
@ -370,92 +620,109 @@ func (c *WSClient) performSubscription(params []interface{}, rcvr notificationRe
defer c.subscriptionsLock.Unlock()
c.subscriptions[resp] = rcvr
ch := rcvr.Receiver()
c.receivers[ch] = append(c.receivers[ch], resp)
return resp, nil
}
func (c *WSClient) performUnsubscription(id string) error {
var resp bool
c.subscriptionsLock.Lock()
defer c.subscriptionsLock.Unlock()
if _, ok := c.subscriptions[id]; !ok {
return errors.New("no subscription with this ID")
}
if err := c.performRequest("unsubscribe", []interface{}{id}, &resp); err != nil {
return err
}
if !resp {
return errors.New("unsubscribe method returned false result")
}
delete(c.subscriptions, id)
return nil
return c.removeSubscription(id)
}
// SubscribeForNewBlocks adds subscription for new block events to this instance
// of the client. It can be filtered by primary consensus node index and/or block
// index allowing to receive blocks since the specified index only, nil value is
// treated as missing filter.
// of the client. It can be filtered by primary consensus node index, nil value doesn't
// add any filters.
//
// Deprecated: please, use SubscribeForNewBlocksWithChan. This method will be removed in future versions.
func (c *WSClient) SubscribeForNewBlocks(primary *int, sinceIndex, tillIndex *uint32) (string, error) {
return c.SubscribeForNewBlocksWithChan(primary, sinceIndex, tillIndex, c.Notifications)
}
// SubscribeForNewBlocksWithChan registers provided channel as a receiver for the
// new block events. Events can be filtered by primary consensus node index, nil
// value doesn't add any filters. If the receiver channel is nil, then the default
// Notifications channel will be used. The receiver channel must be properly read
// and drained after usage in order not to block other notification receivers.
func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, sinceIndex, tillIndex *uint32, rcvrCh chan<- Notification) (string, error) {
if rcvrCh == nil {
rcvrCh = c.Notifications
// Deprecated: please, use ReceiveBlocks. This method will be removed in future versions.
func (c *WSClient) SubscribeForNewBlocks(primary *int) (string, error) {
var flt *neorpc.BlockFilter
if primary != nil {
flt = &neorpc.BlockFilter{Primary: primary}
}
params := []interface{}{"block_added"}
var flt *neorpc.BlockFilter
if primary != nil || sinceIndex != nil || tillIndex != nil {
flt = &neorpc.BlockFilter{Primary: primary, Since: sinceIndex, Till: tillIndex}
params = append(params, flt)
if flt != nil {
params = append(params, *flt)
}
rcvr := notificationReceiver{
typ: neorpc.BlockEventID,
r := &naiveReceiver{
eventID: neorpc.BlockEventID,
filter: flt,
ch: c.Notifications,
}
return c.performSubscription(params, r)
}
// ReceiveBlocks registers provided channel as a receiver for the new block events.
// Events can be filtered by the given BlockFilter, nil value doesn't add any filter.
// The receiver channel must be properly read and drained after usage in order not
// to block other notification receivers. If multiple subscriptions share the same
// receiver channel, then matching notification is only sent once per channel. The
// receiver channel will be closed by the WSClient immediately after MissedEvent is
// received from the server; no unsubscription is performed in this case, so it's the
// user responsibility to unsubscribe.
func (c *WSClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []interface{}{"block_added"}
if flt != nil {
params = append(params, *flt)
}
r := &blockReceiver{
filter: flt,
ch: rcvrCh,
ch: rcvr,
}
return c.performSubscription(params, rcvr)
return c.performSubscription(params, r)
}
// SubscribeForNewTransactions adds subscription for new transaction events to
// this instance of the client. It can be filtered by the sender and/or the signer, nil
// value is treated as missing filter.
//
// Deprecated: please, use SubscribeForNewTransactionsWithChan. This method will be removed in future versions.
// Deprecated: please, use ReceiveTransactions. This method will be removed in future versions.
func (c *WSClient) SubscribeForNewTransactions(sender *util.Uint160, signer *util.Uint160) (string, error) {
return c.SubscribeForNewTransactionsWithChan(sender, signer, c.Notifications)
}
// SubscribeForNewTransactionsWithChan registers provided channel as a receiver
// for new transaction events. Events can be filtered by the sender and/or the
// signer, nil value is treated as missing filter. If the receiver channel is nil,
// then the default Notifications channel will be used. The receiver channel must be
// properly read and drained after usage in order not to block other notification
// receivers.
func (c *WSClient) SubscribeForNewTransactionsWithChan(sender *util.Uint160, signer *util.Uint160, rcvrCh chan<- Notification) (string, error) {
if rcvrCh == nil {
rcvrCh = c.Notifications
}
params := []interface{}{"transaction_added"}
var flt *neorpc.TxFilter
if sender != nil || signer != nil {
flt = &neorpc.TxFilter{Sender: sender, Signer: signer}
}
params := []interface{}{"transaction_added"}
if flt != nil {
params = append(params, *flt)
}
rcvr := notificationReceiver{
typ: neorpc.TransactionEventID,
filter: flt,
ch: rcvrCh,
r := &naiveReceiver{
eventID: neorpc.TransactionEventID,
filter: flt,
ch: c.Notifications,
}
return c.performSubscription(params, rcvr)
return c.performSubscription(params, r)
}
// ReceiveTransactions registers provided channel as a receiver for new transaction
// events. Events can be filtered by the given TxFilter, nil value doesn't add any
// filter. The receiver channel must be properly read and drained after usage in
// order not to block other notification receivers. If multiple subscriptions share
// the same receiver channel, then matching notification is only sent once per channel.
// The receiver channel will be closed by the WSClient immediately after MissedEvent is
// received from the server; no unsubscription is performed in this case, so it's the
// user responsibility to unsubscribe.
func (c *WSClient) ReceiveTransactions(flt *neorpc.TxFilter, rcvr chan<- *transaction.Transaction) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []interface{}{"transaction_added"}
if flt != nil {
params = append(params, *flt)
}
r := &txReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// SubscribeForExecutionNotifications adds subscription for notifications
@ -463,74 +730,102 @@ func (c *WSClient) SubscribeForNewTransactionsWithChan(sender *util.Uint160, sig
// filtered by the contract's hash (that emits notifications), nil value puts no such
// restrictions.
//
// Deprecated: please, use SubscribeForExecutionNotificationsWithChan. This method will be removed in future versions.
// Deprecated: please, use ReceiveExecutionNotifications. This method will be removed in future versions.
func (c *WSClient) SubscribeForExecutionNotifications(contract *util.Uint160, name *string) (string, error) {
return c.SubscribeForExecutionNotificationsWithChan(contract, name, c.Notifications)
}
// SubscribeForExecutionNotificationsWithChan registers provided channel as a
// receiver for execution events. Events can be filtered by the contract's hash
// (that emits notifications), nil value puts no such restrictions. If the
// receiver channel is nil, then the default Notifications channel will be used.
// The receiver channel must be properly read and drained after usage in order
// not to block other notification receivers.
func (c *WSClient) SubscribeForExecutionNotificationsWithChan(contract *util.Uint160, name *string, rcvrCh chan<- Notification) (string, error) {
if rcvrCh == nil {
rcvrCh = c.Notifications
}
params := []interface{}{"notification_from_execution"}
var flt *neorpc.NotificationFilter
if contract != nil || name != nil {
flt = &neorpc.NotificationFilter{Contract: contract, Name: name}
}
params := []interface{}{"notification_from_execution"}
if flt != nil {
params = append(params, *flt)
}
rcvr := notificationReceiver{
typ: neorpc.NotificationEventID,
filter: flt,
ch: rcvrCh,
r := &naiveReceiver{
eventID: neorpc.NotificationEventID,
filter: flt,
ch: c.Notifications,
}
return c.performSubscription(params, rcvr)
return c.performSubscription(params, r)
}
// ReceiveExecutionNotifications registers provided channel as a receiver for execution
// events. Events can be filtered by the given NotificationFilter, nil value doesn't add
// any filter. The receiver channel must be properly read and drained after usage in
// order not to block other notification receivers. If multiple subscriptions share the
// same receiver channel, then matching notification is only sent once per channel. The
// receiver channel will be closed by the WSClient immediately after MissedEvent is
// received from the server; no unsubscription is performed in this case, so it's the
// user responsibility to unsubscribe.
func (c *WSClient) ReceiveExecutionNotifications(flt *neorpc.NotificationFilter, rcvr chan<- *state.ContainedNotificationEvent) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []interface{}{"notification_from_execution"}
if flt != nil {
params = append(params, *flt)
}
r := &executionNotificationReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// SubscribeForTransactionExecutions adds subscription for application execution
// results generated during transaction execution to this instance of the client. It can
// be filtered by state (HALT/FAULT) to check for successful or failing
// transactions; it can also be filtered by script container hash.
// nil value means no filtering.
// transactions, nil value means no filtering.
//
// Deprecated: please, use SubscribeForTransactionExecutionsWithChan. This method will be removed in future versions.
func (c *WSClient) SubscribeForTransactionExecutions(state *string, container *util.Uint256) (string, error) {
return c.SubscribeForTransactionExecutionsWithChan(state, container, c.Notifications)
}
// SubscribeForTransactionExecutionsWithChan registers provided channel as a
// receiver for application execution result events generated during transaction
// execution. Events can be filtered by state (HALT/FAULT) to check for successful
// or failing transactions; it can also be filtered by script container hash.
// nil value means no filtering. If the receiver channel is nil, then the default
// Notifications channel will be used. The receiver channel must be properly read
// and drained after usage in order not to block other notification receivers.
func (c *WSClient) SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- Notification) (string, error) {
if rcvrCh == nil {
rcvrCh = c.Notifications
// Deprecated: please, use ReceiveExecutions. This method will be removed in future versions.
func (c *WSClient) SubscribeForTransactionExecutions(state *string) (string, error) {
var flt *neorpc.ExecutionFilter
if state != nil {
flt = &neorpc.ExecutionFilter{State: state}
}
params := []interface{}{"transaction_executed"}
var flt *neorpc.ExecutionFilter
if state != nil || container != nil {
if state != nil {
if *state != "HALT" && *state != "FAULT" {
if flt != nil {
if flt.State != nil {
if *flt.State != "HALT" && *flt.State != "FAULT" {
return "", errors.New("bad state parameter")
}
}
flt = &neorpc.ExecutionFilter{State: state, Container: container}
params = append(params, *flt)
}
rcvr := notificationReceiver{
typ: neorpc.ExecutionEventID,
filter: flt,
ch: rcvrCh,
r := &naiveReceiver{
eventID: neorpc.ExecutionEventID,
filter: flt,
ch: c.Notifications,
}
return c.performSubscription(params, rcvr)
return c.performSubscription(params, r)
}
// ReceiveExecutions registers provided channel as a receiver for
// application execution result events generated during transaction execution.
// Events can be filtered by the given ExecutionFilter, nil value doesn't add any filter.
// The receiver channel must be properly read and drained after usage in order not
// to block other notification receivers. If multiple subscriptions share the same
// receiver channel, then matching notification is only sent once per channel. The
// receiver channel will be closed by the WSClient immediately after MissedEvent is
// received from the server; no unsubscription is performed in this case, so it's the
// user responsibility to unsubscribe.
func (c *WSClient) ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []interface{}{"transaction_executed"}
if flt != nil {
if flt.State != nil {
if *flt.State != "HALT" && *flt.State != "FAULT" {
return "", errors.New("bad state parameter")
}
}
params = append(params, *flt)
}
r := &executionReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// SubscribeForNotaryRequests adds subscription for notary request payloads
@ -538,33 +833,47 @@ func (c *WSClient) SubscribeForTransactionExecutionsWithChan(state *string, cont
// request sender's hash, or main tx signer's hash, nil value puts no such
// restrictions.
//
// Deprecated: please, use SubscribeForNotaryRequestsWithChan. This method will be removed in future versions.
// Deprecated: please, use ReceiveNotaryRequests. This method will be removed in future versions.
func (c *WSClient) SubscribeForNotaryRequests(sender *util.Uint160, mainSigner *util.Uint160) (string, error) {
return c.SubscribeForNotaryRequestsWithChan(sender, mainSigner, c.Notifications)
}
// SubscribeForNotaryRequestsWithChan registers provided channel as a receiver
// for notary request payload addition or removal events. It can be filtered by
// request sender's hash, or main tx signer's hash, nil value puts no such
// restrictions. If the receiver channel is nil, then the default Notifications
// channel will be used. The receiver channel must be properly read and drained
// after usage in order not to block other notification receivers.
func (c *WSClient) SubscribeForNotaryRequestsWithChan(sender *util.Uint160, mainSigner *util.Uint160, rcvrCh chan<- Notification) (string, error) {
if rcvrCh == nil {
rcvrCh = c.Notifications
var flt *neorpc.TxFilter
if sender != nil || mainSigner != nil {
flt = &neorpc.TxFilter{Sender: sender, Signer: mainSigner}
}
params := []interface{}{"notary_request_event"}
var flt *neorpc.TxFilter
if sender != nil {
flt = &neorpc.TxFilter{Sender: sender, Signer: mainSigner}
if flt != nil {
params = append(params, *flt)
}
rcvr := notificationReceiver{
typ: neorpc.NotaryRequestEventID,
filter: flt,
ch: rcvrCh,
r := &naiveReceiver{
eventID: neorpc.NotaryRequestEventID,
filter: flt,
ch: c.Notifications,
}
return c.performSubscription(params, rcvr)
return c.performSubscription(params, r)
}
// ReceiveNotaryRequests registers provided channel as a receiver for notary request
// payload addition or removal events. Events can be filtered by the given TxFilter
// where sender corresponds to notary request sender (the second fallback transaction
// signer) and signer corresponds to main transaction signers. nil value doesn't add
// any filter. The receiver channel must be properly read and drained after usage in
// order not to block other notification receivers. If multiple subscriptions share
// the same receiver channel, then matching notification is only sent once per channel.
// The receiver channel will be closed by the WSClient immediately after MissedEvent
// is received from the server; no unsubscription is performed in this case, so it's the
// user responsibility to unsubscribe.
func (c *WSClient) ReceiveNotaryRequests(flt *neorpc.TxFilter, rcvr chan<- *result.NotaryRequestEvent) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []interface{}{"notary_request_event"}
if flt != nil {
params = append(params, *flt)
}
r := &notaryRequestReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// Unsubscribe removes subscription for the given event stream.
@ -578,18 +887,43 @@ func (c *WSClient) UnsubscribeAll() error {
defer c.subscriptionsLock.Unlock()
for id := range c.subscriptions {
var resp bool
if err := c.performRequest("unsubscribe", []interface{}{id}, &resp); err != nil {
err := c.removeSubscription(id)
if err != nil {
return err
}
if !resp {
return errors.New("unsubscribe method returned false result")
}
delete(c.subscriptions, id)
}
return nil
}
// removeSubscription is internal method that removes subscription with the given
// ID from the list of subscriptions and receivers. It must be performed under
// subscriptions lock.
func (c *WSClient) removeSubscription(id string) error {
var resp bool
if err := c.performRequest("unsubscribe", []interface{}{id}, &resp); err != nil {
return err
}
if !resp {
return errors.New("unsubscribe method returned false result")
}
rcvr := c.subscriptions[id]
ch := rcvr.Receiver()
ids := c.receivers[ch]
for i, rcvrID := range ids {
if rcvrID == id {
ids = append(ids[:i], ids[i+1:]...)
break
}
}
if len(ids) == 0 {
delete(c.receivers, ch)
} else {
c.receivers[ch] = ids
}
delete(c.subscriptions, id)
return nil
}
// setCloseErr is a thread-safe method setting closeErr in case if it's not yet set.
func (c *WSClient) setCloseErr(err error) {
c.closeErrLock.Lock()

View file

@ -15,11 +15,16 @@ import (
"github.com/gorilla/websocket"
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/services/rpcsrv/params"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)
@ -32,31 +37,26 @@ func TestWSClientClose(t *testing.T) {
}
func TestWSClientSubscription(t *testing.T) {
ch := make(chan Notification)
bCh := make(chan *block.Block)
txCh := make(chan *transaction.Transaction)
aerCh := make(chan *state.AppExecResult)
ntfCh := make(chan *state.ContainedNotificationEvent)
ntrCh := make(chan *result.NotaryRequestEvent)
var cases = map[string]func(*WSClient) (string, error){
"blocks": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForNewBlocksWithChan(nil, nil, nil, nil)
},
"blocks_with_custom_ch": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForNewBlocksWithChan(nil, nil, nil, ch)
return wsc.ReceiveBlocks(nil, bCh)
},
"transactions": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForNewTransactionsWithChan(nil, nil, nil)
},
"transactions_with_custom_ch": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForNewTransactionsWithChan(nil, nil, ch)
return wsc.ReceiveTransactions(nil, txCh)
},
"notifications": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForExecutionNotificationsWithChan(nil, nil, nil)
},
"notifications_with_custom_ch": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForExecutionNotificationsWithChan(nil, nil, ch)
return wsc.ReceiveExecutionNotifications(nil, ntfCh)
},
"executions": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForTransactionExecutionsWithChan(nil, nil, nil)
return wsc.ReceiveExecutions(nil, aerCh)
},
"executions_with_custom_ch": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForTransactionExecutionsWithChan(nil, nil, ch)
"notary requests": func(wsc *WSClient) (string, error) {
return wsc.ReceiveNotaryRequests(nil, ntrCh)
},
}
t.Run("good", func(t *testing.T) {
@ -96,13 +96,13 @@ func TestWSClientUnsubscription(t *testing.T) {
var cases = map[string]responseCheck{
"good": {`{"jsonrpc": "2.0", "id": 1, "result": true}`, func(t *testing.T, wsc *WSClient) {
// We can't really subscribe using this stub server, so set up wsc internals.
wsc.subscriptions["0"] = notificationReceiver{}
wsc.subscriptions["0"] = &blockReceiver{}
err := wsc.Unsubscribe("0")
require.NoError(t, err)
}},
"all": {`{"jsonrpc": "2.0", "id": 1, "result": true}`, func(t *testing.T, wsc *WSClient) {
// We can't really subscribe using this stub server, so set up wsc internals.
wsc.subscriptions["0"] = notificationReceiver{}
wsc.subscriptions["0"] = &blockReceiver{}
err := wsc.UnsubscribeAll()
require.NoError(t, err)
require.Equal(t, 0, len(wsc.subscriptions))
@ -113,13 +113,13 @@ func TestWSClientUnsubscription(t *testing.T) {
}},
"error returned": {`{"jsonrpc": "2.0", "id": 1, "error":{"code":-32602,"message":"Invalid Params"}}`, func(t *testing.T, wsc *WSClient) {
// We can't really subscribe using this stub server, so set up wsc internals.
wsc.subscriptions["0"] = notificationReceiver{}
wsc.subscriptions["0"] = &blockReceiver{}
err := wsc.Unsubscribe("0")
require.Error(t, err)
}},
"false returned": {`{"jsonrpc": "2.0", "id": 1, "result": false}`, func(t *testing.T, wsc *WSClient) {
// We can't really subscribe using this stub server, so set up wsc internals.
wsc.subscriptions["0"] = notificationReceiver{}
wsc.subscriptions["0"] = &blockReceiver{}
err := wsc.Unsubscribe("0")
require.Error(t, err)
}},
@ -144,7 +144,7 @@ func TestWSClientEvents(t *testing.T) {
`{"jsonrpc":"2.0","method":"notification_from_execution","params":[{"container":"0xe1cd5e57e721d2a2e05fb1f08721b12057b25ab1dd7fd0f33ee1639932fdfad7","contract":"0x1b4357bff5a01bdf2a6581247cf9ed1e24629176","eventname":"contract call","state":{"type":"Array","value":[{"type":"ByteString","value":"dHJhbnNmZXI="},{"type":"Array","value":[{"type":"ByteString","value":"dpFiJB7t+XwkgWUq3xug9b9XQxs="},{"type":"ByteString","value":"MW6FEDkBnTnfwsN9bD/uGf1YCYc="},{"type":"Integer","value":"1000"}]}]}}]}`,
`{"jsonrpc":"2.0","method":"transaction_executed","params":[{"container":"0xf97a72b7722c109f909a8bc16c22368c5023d85828b09b127b237aace33cf099","trigger":"Application","vmstate":"HALT","gasconsumed":"6042610","stack":[],"notifications":[{"contract":"0xe65ff7b3a02d207b584a5c27057d4e9862ef01da","eventname":"contract call","state":{"type":"Array","value":[{"type":"ByteString","value":"dHJhbnNmZXI="},{"type":"Array","value":[{"type":"ByteString","value":"MW6FEDkBnTnfwsN9bD/uGf1YCYc="},{"type":"ByteString","value":"IHKCdK+vw29DoHHTKM+j5inZy7A="},{"type":"Integer","value":"123"}]}]}},{"contract":"0xe65ff7b3a02d207b584a5c27057d4e9862ef01da","eventname":"transfer","state":{"type":"Array","value":[{"type":"ByteString","value":"MW6FEDkBnTnfwsN9bD/uGf1YCYc="},{"type":"ByteString","value":"IHKCdK+vw29DoHHTKM+j5inZy7A="},{"type":"Integer","value":"123"}]}}]}]}`,
fmt.Sprintf(`{"jsonrpc":"2.0","method":"block_added","params":[%s]}`, b1Verbose),
`{"jsonrpc":"2.0","method":"event_missed","params":[]}`,
`{"jsonrpc":"2.0","method":"event_missed","params":[]}`, // the last one, will trigger receiver channels closing.
}
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/ws" && req.Method == "GET" {
@ -163,109 +163,123 @@ func TestWSClientEvents(t *testing.T) {
return
}
}))
wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{})
require.NoError(t, err)
wsc.getNextRequestID = getTestRequestID
wsc.cacheLock.Lock()
wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually.
wsc.cache.network = netmode.UnitTestNet
wsc.cacheLock.Unlock()
t.Run("default ntf channel", func(t *testing.T) {
wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{})
require.NoError(t, err)
wsc.getNextRequestID = getTestRequestID
wsc.cacheLock.Lock()
wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually.
wsc.cache.network = netmode.UnitTestNet
wsc.cacheLock.Unlock()
// Our server mock is restricted, so perform subscriptions manually with default notifications channel.
wsc.subscriptionsLock.Lock()
wsc.subscriptions["0"] = notificationReceiver{typ: neorpc.BlockEventID, ch: wsc.Notifications}
wsc.subscriptions["1"] = notificationReceiver{typ: neorpc.ExecutionEventID, ch: wsc.Notifications}
wsc.subscriptions["2"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: wsc.Notifications}
// MissedEvent must be delivered without subscription.
wsc.subscriptionsLock.Unlock()
for range events {
select {
case _, ok = <-wsc.Notifications:
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
require.True(t, ok)
}
// Our server mock is restricted, so perform subscriptions manually with default notifications channel.
bCh1 := make(chan *block.Block)
bCh2 := make(chan *block.Block)
aerCh1 := make(chan *state.AppExecResult)
aerCh2 := make(chan *state.AppExecResult)
aerCh3 := make(chan *state.AppExecResult)
ntfCh := make(chan *state.ContainedNotificationEvent)
halt := "HALT"
fault := "FAULT"
wsc.subscriptionsLock.Lock()
wsc.subscriptions["0"] = &blockReceiver{ch: bCh1}
wsc.receivers[chan<- *block.Block(bCh1)] = []string{"0"}
wsc.subscriptions["1"] = &blockReceiver{ch: bCh2} // two different channels subscribed for same notifications
wsc.receivers[chan<- *block.Block(bCh2)] = []string{"1"}
wsc.subscriptions["2"] = &executionNotificationReceiver{ch: ntfCh}
wsc.subscriptions["3"] = &executionNotificationReceiver{ch: ntfCh} // check duplicating subscriptions
wsc.receivers[chan<- *state.ContainedNotificationEvent(ntfCh)] = []string{"2", "3"}
wsc.subscriptions["4"] = &executionReceiver{ch: aerCh1}
wsc.receivers[chan<- *state.AppExecResult(aerCh1)] = []string{"4"}
wsc.subscriptions["5"] = &executionReceiver{filter: &neorpc.ExecutionFilter{State: &halt}, ch: aerCh2}
wsc.receivers[chan<- *state.AppExecResult(aerCh2)] = []string{"5"}
wsc.subscriptions["6"] = &executionReceiver{filter: &neorpc.ExecutionFilter{State: &fault}, ch: aerCh3}
wsc.receivers[chan<- *state.AppExecResult(aerCh3)] = []string{"6"}
// MissedEvent must close the channels above.
wsc.subscriptions["7"] = &naiveReceiver{eventID: neorpc.BlockEventID, ch: wsc.Notifications}
wsc.subscriptions["8"] = &naiveReceiver{eventID: neorpc.BlockEventID, ch: wsc.Notifications} // check duplicating subscriptions
wsc.subscriptions["9"] = &naiveReceiver{eventID: neorpc.ExecutionEventID, ch: wsc.Notifications} // check different events
wsc.receivers[wsc.Notifications] = []string{"7", "8", "9"}
wsc.subscriptionsLock.Unlock()
var (
b1Cnt, b2Cnt int
aer1Cnt, aer2Cnt, aer3Cnt int
ntfCnt int
defaultCount int
expectedb1Cnt, expectedb2Cnt = 1, 1 // single Block event
expectedaer1Cnt, expectedaer2Cnt, expectedaer3Cnt = 2, 2, 0 // two HALTED AERs
expectedntfCnt = 1 // single notification event
expectedDefaultCnt = 1 + 2 + 1 // single Block event + two AERs + missed event
aer *state.AppExecResult
)
for b1Cnt+b2Cnt+
aer1Cnt+aer2Cnt+aer3Cnt+
ntfCnt+
defaultCount !=
expectedb1Cnt+expectedb2Cnt+
expectedaer1Cnt+expectedaer2Cnt+expectedaer3Cnt+
expectedntfCnt+
expectedDefaultCnt {
select {
case _, ok = <-bCh1:
if ok {
b1Cnt++
}
case _, ok = <-bCh2:
if ok {
b2Cnt++
}
case _, ok = <-aerCh1:
if ok {
aer1Cnt++
}
case aer, ok = <-aerCh2:
if ok {
require.Equal(t, vmstate.Halt, aer.VMState)
aer2Cnt++
}
case _, ok = <-aerCh3:
if ok {
aer3Cnt++
}
case _, ok = <-ntfCh:
if ok {
ntfCnt++
}
case _, ok = <-wsc.Notifications:
if ok {
defaultCount++
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
// Connection closed by server.
require.False(t, ok)
})
t.Run("multiple ntf channels", func(t *testing.T) {
wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{})
require.NoError(t, err)
wsc.getNextRequestID = getTestRequestID
wsc.cacheLock.Lock()
wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually.
wsc.cache.network = netmode.UnitTestNet
wsc.cacheLock.Unlock()
}
assert.Equal(t, expectedb1Cnt, b1Cnt)
assert.Equal(t, expectedb2Cnt, b2Cnt)
assert.Equal(t, expectedaer1Cnt, aer1Cnt)
assert.Equal(t, expectedaer2Cnt, aer2Cnt)
assert.Equal(t, expectedaer3Cnt, aer3Cnt)
assert.Equal(t, expectedntfCnt, ntfCnt)
assert.Equal(t, expectedDefaultCnt, defaultCount)
// Our server mock is restricted, so perform subscriptions manually with default notifications channel.
ch1 := make(chan Notification)
ch2 := make(chan Notification)
ch3 := make(chan Notification)
halt := "HALT"
fault := "FAULT"
wsc.subscriptionsLock.Lock()
wsc.subscriptions["0"] = notificationReceiver{typ: neorpc.BlockEventID, ch: wsc.Notifications}
wsc.subscriptions["1"] = notificationReceiver{typ: neorpc.ExecutionEventID, ch: wsc.Notifications}
wsc.subscriptions["2"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: wsc.Notifications}
wsc.subscriptions["3"] = notificationReceiver{typ: neorpc.BlockEventID, ch: ch1}
wsc.subscriptions["4"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2}
wsc.subscriptions["5"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2} // check duplicating subscriptions
wsc.subscriptions["6"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: &halt}, ch: ch2}
wsc.subscriptions["7"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: &fault}, ch: ch3}
// MissedEvent must be delivered without subscription.
wsc.subscriptionsLock.Unlock()
var (
defaultChCnt int
ch1Cnt int
ch2Cnt int
ch3Cnt int
expectedDefaultCnCount = len(events)
expectedCh1Cnt = 1 + 1 // Block event + Missed event
expectedCh2Cnt = 1 + 2 + 1 // Notification event + 2 Execution events + Missed event
expectedCh3Cnt = 1 // Missed event
ntf Notification
)
for i := 0; i < expectedDefaultCnCount+expectedCh1Cnt+expectedCh2Cnt+expectedCh3Cnt; i++ {
select {
case ntf, ok = <-wsc.Notifications:
defaultChCnt++
case ntf, ok = <-ch1:
require.True(t, ntf.Type == neorpc.BlockEventID || ntf.Type == neorpc.MissedEventID, ntf.Type)
ch1Cnt++
case ntf, ok = <-ch2:
require.True(t, ntf.Type == neorpc.NotificationEventID || ntf.Type == neorpc.MissedEventID || ntf.Type == neorpc.ExecutionEventID)
ch2Cnt++
case ntf, ok = <-ch3:
require.True(t, ntf.Type == neorpc.MissedEventID)
ch3Cnt++
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
require.True(t, ok)
}
select {
case _, ok = <-wsc.Notifications:
case _, ok = <-ch1:
case _, ok = <-ch2:
case _, ok = <-ch3:
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
// Connection closed by server.
require.False(t, ok)
require.Equal(t, expectedDefaultCnCount, defaultChCnt)
require.Equal(t, expectedCh1Cnt, ch1Cnt)
require.Equal(t, expectedCh2Cnt, ch2Cnt)
require.Equal(t, expectedCh3Cnt, ch3Cnt)
})
// Channels must be closed by server
_, ok = <-bCh1
require.False(t, ok)
_, ok = <-bCh2
require.False(t, ok)
_, ok = <-aerCh1
require.False(t, ok)
_, ok = <-aerCh2
require.False(t, ok)
_, ok = <-aerCh3
require.False(t, ok)
_, ok = <-ntfCh
require.False(t, ok)
_, ok = <-ntfCh
require.False(t, ok)
}
func TestWSExecutionVMStateCheck(t *testing.T) {
@ -276,12 +290,16 @@ func TestWSExecutionVMStateCheck(t *testing.T) {
wsc.getNextRequestID = getTestRequestID
require.NoError(t, wsc.Init())
filter := "NONE"
_, err = wsc.SubscribeForTransactionExecutionsWithChan(&filter, nil, nil)
_, err = wsc.ReceiveExecutions(&neorpc.ExecutionFilter{State: &filter}, make(chan *state.AppExecResult))
require.Error(t, err)
wsc.Close()
}
func TestWSFilteredSubscriptions(t *testing.T) {
bCh := make(chan *block.Block)
txCh := make(chan *transaction.Transaction)
aerCh := make(chan *state.AppExecResult)
ntfCh := make(chan *state.ContainedNotificationEvent)
var cases = []struct {
name string
clientCode func(*testing.T, *WSClient)
@ -290,7 +308,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"blocks primary",
func(t *testing.T, wsc *WSClient) {
primary := 3
_, err := wsc.SubscribeForNewBlocksWithChan(&primary, nil, nil, nil)
_, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{Primary: &primary}, bCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -305,7 +323,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"blocks since",
func(t *testing.T, wsc *WSClient) {
var since uint32 = 3
_, err := wsc.SubscribeForNewBlocksWithChan(nil, &since, nil, nil)
_, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -320,7 +338,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"blocks till",
func(t *testing.T, wsc *WSClient) {
var till uint32 = 3
_, err := wsc.SubscribeForNewBlocksWithChan(nil, nil, &till, nil)
_, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{Till: &till}, bCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -339,7 +357,11 @@ func TestWSFilteredSubscriptions(t *testing.T) {
primary = 2
till uint32 = 5
)
_, err := wsc.SubscribeForNewBlocksWithChan(&primary, &since, &till, nil)
_, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{
Primary: &primary,
Since: &since,
Till: &till,
}, bCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -354,7 +376,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"transactions sender",
func(t *testing.T, wsc *WSClient) {
sender := util.Uint160{1, 2, 3, 4, 5}
_, err := wsc.SubscribeForNewTransactionsWithChan(&sender, nil, nil)
_, err := wsc.ReceiveTransactions(&neorpc.TxFilter{Sender: &sender}, txCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -368,7 +390,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"transactions signer",
func(t *testing.T, wsc *WSClient) {
signer := util.Uint160{0, 42}
_, err := wsc.SubscribeForNewTransactionsWithChan(nil, &signer, nil)
_, err := wsc.ReceiveTransactions(&neorpc.TxFilter{Signer: &signer}, txCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -383,7 +405,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
func(t *testing.T, wsc *WSClient) {
sender := util.Uint160{1, 2, 3, 4, 5}
signer := util.Uint160{0, 42}
_, err := wsc.SubscribeForNewTransactionsWithChan(&sender, &signer, nil)
_, err := wsc.ReceiveTransactions(&neorpc.TxFilter{Sender: &sender, Signer: &signer}, txCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -397,7 +419,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"notifications contract hash",
func(t *testing.T, wsc *WSClient) {
contract := util.Uint160{1, 2, 3, 4, 5}
_, err := wsc.SubscribeForExecutionNotificationsWithChan(&contract, nil, nil)
_, err := wsc.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, ntfCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -411,7 +433,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"notifications name",
func(t *testing.T, wsc *WSClient) {
name := "my_pretty_notification"
_, err := wsc.SubscribeForExecutionNotificationsWithChan(nil, &name, nil)
_, err := wsc.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Name: &name}, ntfCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -426,7 +448,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
func(t *testing.T, wsc *WSClient) {
contract := util.Uint160{1, 2, 3, 4, 5}
name := "my_pretty_notification"
_, err := wsc.SubscribeForExecutionNotificationsWithChan(&contract, &name, nil)
_, err := wsc.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract, Name: &name}, ntfCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -440,7 +462,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"executions state",
func(t *testing.T, wsc *WSClient) {
state := "FAULT"
_, err := wsc.SubscribeForTransactionExecutionsWithChan(&state, nil, nil)
_, err := wsc.ReceiveExecutions(&neorpc.ExecutionFilter{State: &state}, aerCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -454,7 +476,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"executions container",
func(t *testing.T, wsc *WSClient) {
container := util.Uint256{1, 2, 3}
_, err := wsc.SubscribeForTransactionExecutionsWithChan(nil, &container, nil)
_, err := wsc.ReceiveExecutions(&neorpc.ExecutionFilter{Container: &container}, aerCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -469,7 +491,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
func(t *testing.T, wsc *WSClient) {
state := "FAULT"
container := util.Uint256{1, 2, 3}
_, err := wsc.SubscribeForTransactionExecutionsWithChan(&state, &container, nil)
_, err := wsc.ReceiveExecutions(&neorpc.ExecutionFilter{State: &state, Container: &container}, aerCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {