rpc: refactor WSClient subscriptions API

Make it more specific, close #2756.
This commit is contained in:
Anna Shaleva 2022-10-25 15:11:24 +03:00
parent 2a53db42af
commit 0a5905390c
4 changed files with 672 additions and 315 deletions

View file

@ -6,10 +6,10 @@ import (
"fmt"
"time"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util"
)
@ -30,6 +30,9 @@ var (
// ErrAwaitingNotSupported is returned from Wait method if Waiter instance
// doesn't support transaction awaiting.
ErrAwaitingNotSupported = errors.New("awaiting not supported")
// ErrMissedEvent is returned when RPCEventWaiter closes receiver channel
// which happens if missed event was received from the RPC server.
ErrMissedEvent = errors.New("some event was missed")
)
type (
@ -65,8 +68,8 @@ type (
RPCEventWaiter interface {
RPCPollingWaiter
SubscribeForNewBlocksWithChan(primary *int, since *uint32, till *uint32, rcvrCh chan<- rpcclient.Notification) (string, error)
SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- rpcclient.Notification) (string, error)
ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error)
ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error)
Unsubscribe(id string) error
}
)
@ -223,22 +226,27 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui
}
}
}()
rcvr := make(chan rpcclient.Notification)
bRcvr := make(chan *block.Block)
aerRcvr := make(chan *state.AppExecResult)
defer func() {
drainLoop:
// Drain rcvr to avoid other notification receivers blocking.
// Drain receivers to avoid other notification receivers blocking.
for {
select {
case <-rcvr:
case <-bRcvr:
case <-aerRcvr:
default:
break drainLoop
}
}
close(rcvr)
if wsWaitErr == nil || !errors.Is(wsWaitErr, ErrMissedEvent) {
close(bRcvr)
close(aerRcvr)
}
}()
// Execution event follows the block event, thus wait until the block next to the VUB to be sure.
since := vub + 1
blocksID, err := w.ws.SubscribeForNewBlocksWithChan(nil, &since, nil, rcvr)
blocksID, err := w.ws.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bRcvr)
if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err)
return
@ -256,7 +264,7 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui
}
}()
for _, h := range hashes {
txsID, err := w.ws.SubscribeForTransactionExecutionsWithChan(nil, &h, rcvr)
txsID, err := w.ws.ReceiveExecutions(&neorpc.ExecutionFilter{Container: &h}, aerRcvr)
if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err)
return
@ -275,27 +283,25 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui
}()
}
for {
select {
case ntf := <-rcvr:
switch ntf.Type {
case neorpc.BlockEventID:
waitErr = ErrTxNotAccepted
return
case neorpc.ExecutionEventID:
res = ntf.Value.(*state.AppExecResult)
return
case neorpc.MissedEventID:
// We're toast, retry with non-ws client.
wsWaitErr = errors.New("some event was missed")
return
}
case <-w.ws.Context().Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err())
return
case <-ctx.Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, ctx.Err())
select {
case _, ok := <-bRcvr:
if !ok {
// We're toast, retry with non-ws client.
wsWaitErr = ErrMissedEvent
return
}
waitErr = ErrTxNotAccepted
case aer, ok := <-aerRcvr:
if !ok {
// We're toast, retry with non-ws client.
wsWaitErr = ErrMissedEvent
return
}
res = aer
case <-w.ws.Context().Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err())
case <-ctx.Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, ctx.Err())
}
return
}

View file

@ -11,7 +11,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/stretchr/testify/require"
)
@ -20,20 +19,20 @@ type AwaitableRPCClient struct {
RPCClient
chLock sync.RWMutex
subBlockCh chan<- rpcclient.Notification
subTxCh chan<- rpcclient.Notification
subBlockCh chan<- *block.Block
subTxCh chan<- *state.AppExecResult
}
func (c *AwaitableRPCClient) SubscribeForNewBlocksWithChan(primary *int, since *uint32, till *uint32, rcvrCh chan<- rpcclient.Notification) (string, error) {
func (c *AwaitableRPCClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) {
c.chLock.Lock()
defer c.chLock.Unlock()
c.subBlockCh = rcvrCh
c.subBlockCh = rcvr
return "1", nil
}
func (c *AwaitableRPCClient) SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- rpcclient.Notification) (string, error) {
func (c *AwaitableRPCClient) ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error) {
c.chLock.Lock()
defer c.chLock.Unlock()
c.subTxCh = rcvrCh
c.subTxCh = rcvr
return "2", nil
}
func (c *AwaitableRPCClient) Unsubscribe(id string) error { return nil }
@ -163,10 +162,7 @@ func TestWSWaiter_Wait(t *testing.T) {
check(t, func() {
c.chLock.RLock()
defer c.chLock.RUnlock()
c.subBlockCh <- rpcclient.Notification{
Type: neorpc.ExecutionEventID,
Value: expected,
}
c.subTxCh <- expected
})
// Missing AER after VUB.
@ -178,9 +174,6 @@ func TestWSWaiter_Wait(t *testing.T) {
check(t, func() {
c.chLock.RLock()
defer c.chLock.RUnlock()
c.subBlockCh <- rpcclient.Notification{
Type: neorpc.BlockEventID,
Value: &block.Block{},
}
c.subBlockCh <- &block.Block{}
})
}

View file

@ -34,6 +34,11 @@ type WSClient struct {
// WSClient to block even regular requests. This channel is not buffered.
// In case of protocol error or upon connection closure, this channel will
// be closed, so make sure to handle this.
//
// Deprecated: please, use custom channels with ReceiveBlocks, ReceiveTransactions,
// ReceiveExecutionNotifications, ReceiveExecutions, ReceiveNotaryRequests
// methods to subscribe for notifications. This field will be removed in future
// versions.
Notifications chan Notification
ws *websocket.Conn
@ -47,29 +52,261 @@ type WSClient struct {
subscriptionsLock sync.RWMutex
subscriptions map[string]notificationReceiver
// receivers is a mapping from receiver channel to a set of corresponding subscription IDs.
// It must be accessed with subscriptionsLock taken. Its keys must be used to deliver
// notifications, if channel is not in the receivers list and corresponding subscription
// still exists, notification must not be sent.
receivers map[interface{}][]string
respLock sync.RWMutex
respChannels map[uint64]chan *neorpc.Response
}
// notificationReceiver is a server events receiver. It stores desired notifications ID
// and filter and a channel used to receive matching notifications.
type notificationReceiver struct {
typ neorpc.EventID
filter interface{}
ch chan<- Notification
// notificationReceiver is an interface aimed to provide WS subscriber functionality
// for different types of subscriptions.
type notificationReceiver interface {
// Comparator provides notification filtering functionality.
rpcevent.Comparator
// Receiver returns notification receiver channel.
Receiver() interface{}
// TrySend checks whether notification passes receiver filter and sends it
// to the underlying channel if so.
TrySend(ntf Notification) bool
// Close closes underlying receiver channel.
Close()
}
// EventID implements neorpc.Comparator interface and returns notification ID.
func (r notificationReceiver) EventID() neorpc.EventID {
return r.typ
// blockReceiver stores information about block events subscriber.
type blockReceiver struct {
filter *neorpc.BlockFilter
ch chan<- *block.Block
}
// Filter implements neorpc.Comparator interface and returns notification filter.
func (r notificationReceiver) Filter() interface{} {
// EventID implements neorpc.Comparator interface.
func (r *blockReceiver) EventID() neorpc.EventID {
return neorpc.BlockEventID
}
// Filter implements neorpc.Comparator interface.
func (r *blockReceiver) Filter() interface{} {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *blockReceiver) Receiver() interface{} {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *blockReceiver) TrySend(ntf Notification) bool {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*block.Block)
return true
}
return false
}
// Close implements notificationReceiver interface.
func (r *blockReceiver) Close() {
close(r.ch)
}
// txReceiver stores information about transaction events subscriber.
type txReceiver struct {
filter *neorpc.TxFilter
ch chan<- *transaction.Transaction
}
// EventID implements neorpc.Comparator interface.
func (r *txReceiver) EventID() neorpc.EventID {
return neorpc.TransactionEventID
}
// Filter implements neorpc.Comparator interface.
func (r *txReceiver) Filter() interface{} {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *txReceiver) Receiver() interface{} {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *txReceiver) TrySend(ntf Notification) bool {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*transaction.Transaction)
return true
}
return false
}
// Close implements notificationReceiver interface.
func (r *txReceiver) Close() {
close(r.ch)
}
// executionNotificationReceiver stores information about execution notifications subscriber.
type executionNotificationReceiver struct {
filter *neorpc.NotificationFilter
ch chan<- *state.ContainedNotificationEvent
}
// EventID implements neorpc.Comparator interface.
func (r *executionNotificationReceiver) EventID() neorpc.EventID {
return neorpc.NotificationEventID
}
// Filter implements neorpc.Comparator interface.
func (r *executionNotificationReceiver) Filter() interface{} {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *executionNotificationReceiver) Receiver() interface{} {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *executionNotificationReceiver) TrySend(ntf Notification) bool {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*state.ContainedNotificationEvent)
return true
}
return false
}
// Close implements notificationReceiver interface.
func (r *executionNotificationReceiver) Close() {
close(r.ch)
}
// executionReceiver stores information about application execution results subscriber.
type executionReceiver struct {
filter *neorpc.ExecutionFilter
ch chan<- *state.AppExecResult
}
// EventID implements neorpc.Comparator interface.
func (r *executionReceiver) EventID() neorpc.EventID {
return neorpc.ExecutionEventID
}
// Filter implements neorpc.Comparator interface.
func (r *executionReceiver) Filter() interface{} {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *executionReceiver) Receiver() interface{} {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *executionReceiver) TrySend(ntf Notification) bool {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*state.AppExecResult)
return true
}
return false
}
// Close implements notificationReceiver interface.
func (r *executionReceiver) Close() {
close(r.ch)
}
// notaryRequestReceiver stores information about notary requests subscriber.
type notaryRequestReceiver struct {
filter *neorpc.TxFilter
ch chan<- *result.NotaryRequestEvent
}
// EventID implements neorpc.Comparator interface.
func (r *notaryRequestReceiver) EventID() neorpc.EventID {
return neorpc.NotaryRequestEventID
}
// Filter implements neorpc.Comparator interface.
func (r *notaryRequestReceiver) Filter() interface{} {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *notaryRequestReceiver) Receiver() interface{} {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *notaryRequestReceiver) TrySend(ntf Notification) bool {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf.Value.(*result.NotaryRequestEvent)
return true
}
return false
}
// Close implements notificationReceiver interface.
func (r *notaryRequestReceiver) Close() {
close(r.ch)
}
// naiveReceiver is a structure leaved for deprecated single channel based notifications
// delivering.
//
// Deprecated: this receiver must be removed after outdated subscriptions API removal.
type naiveReceiver struct {
eventID neorpc.EventID
filter interface{}
ch chan<- Notification
}
// EventID implements neorpc.Comparator interface.
func (r *naiveReceiver) EventID() neorpc.EventID {
return r.eventID
}
// Filter implements neorpc.Comparator interface.
func (r *naiveReceiver) Filter() interface{} {
return r.filter
}
// Receiver implements notificationReceiver interface.
func (r *naiveReceiver) Receiver() interface{} {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *naiveReceiver) TrySend(ntf Notification) bool {
if rpcevent.Matches(r, ntf) {
r.ch <- ntf
return true
}
return false
}
// Close implements notificationReceiver interface.
func (r *naiveReceiver) Close() {
r.ch <- Notification{
Type: neorpc.MissedEventID, // backwards-compatible behaviour
}
}
// Notification represents a server-generated notification for client subscriptions.
// Value can be one of *block.Block, *state.AppExecResult, *state.ContainedNotificationEvent
// *transaction.Transaction or *subscriptions.NotaryRequestEvent based on Type.
@ -111,6 +348,9 @@ const (
wsWriteLimit = wsPingPeriod / 2
)
// ErrNilNotificationReceiver is returned when notification receiver channel is nil.
var ErrNilNotificationReceiver = errors.New("nil notification receiver")
// errConnClosedByUser is a WSClient error used iff the user calls (*WSClient).Close method by himself.
var errConnClosedByUser = errors.New("connection closed by user")
@ -138,6 +378,7 @@ func NewWS(ctx context.Context, endpoint string, opts Options) (*WSClient, error
respChannels: make(map[uint64]chan *neorpc.Response),
requests: make(chan *neorpc.Request),
subscriptions: make(map[string]notificationReceiver),
receivers: make(map[interface{}][]string),
}
err = initClient(ctx, &wsc.Client, endpoint, opts)
@ -237,13 +478,22 @@ readloop:
break readloop
}
}
ok := make(map[chan<- Notification]bool)
if event == neorpc.MissedEventID {
c.subscriptionsLock.Lock()
for rcvr, ids := range c.receivers {
c.subscriptions[ids[0]].Close()
delete(c.receivers, rcvr)
}
c.subscriptionsLock.Unlock()
continue readloop
}
c.subscriptionsLock.RLock()
for _, rcvr := range c.subscriptions {
ntf := Notification{Type: event, Value: val}
if (rpcevent.Matches(rcvr, ntf) || event == neorpc.MissedEventID /*missed event must be delivered to each receiver*/) && !ok[rcvr.ch] {
ok[rcvr.ch] = true // strictly one notification per channel
rcvr.ch <- ntf // this will block other receivers
ntf := Notification{Type: event, Value: val}
for _, ids := range c.receivers {
for _, id := range ids {
if c.subscriptions[id].TrySend(ntf) {
break // strictly one notification per channel
}
}
}
c.subscriptionsLock.RUnlock()
@ -370,92 +620,110 @@ func (c *WSClient) performSubscription(params []interface{}, rcvr notificationRe
defer c.subscriptionsLock.Unlock()
c.subscriptions[resp] = rcvr
ch := rcvr.Receiver()
c.receivers[ch] = append(c.receivers[ch], resp)
return resp, nil
}
func (c *WSClient) performUnsubscription(id string) error {
var resp bool
c.subscriptionsLock.Lock()
defer c.subscriptionsLock.Unlock()
if _, ok := c.subscriptions[id]; !ok {
return errors.New("no subscription with this ID")
}
if err := c.performRequest("unsubscribe", []interface{}{id}, &resp); err != nil {
return err
}
if !resp {
return errors.New("unsubscribe method returned false result")
}
delete(c.subscriptions, id)
return nil
return c.removeSubscription(id)
}
// SubscribeForNewBlocks adds subscription for new block events to this instance
// of the client. It can be filtered by primary consensus node index and/or block
// index allowing to receive blocks since the specified index only, nil value is
// treated as missing filter.
// index allowing to receive blocks since/till the specified index only, nil value
// is treated as missing filter.
//
// Deprecated: please, use SubscribeForNewBlocksWithChan. This method will be removed in future versions.
// Deprecated: please, use ReceiveBlocks. This method will be removed in future versions.
func (c *WSClient) SubscribeForNewBlocks(primary *int, sinceIndex, tillIndex *uint32) (string, error) {
return c.SubscribeForNewBlocksWithChan(primary, sinceIndex, tillIndex, c.Notifications)
}
// SubscribeForNewBlocksWithChan registers provided channel as a receiver for the
// new block events. Events can be filtered by primary consensus node index, nil
// value doesn't add any filters. If the receiver channel is nil, then the default
// Notifications channel will be used. The receiver channel must be properly read
// and drained after usage in order not to block other notification receivers.
func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, sinceIndex, tillIndex *uint32, rcvrCh chan<- Notification) (string, error) {
if rcvrCh == nil {
rcvrCh = c.Notifications
}
params := []interface{}{"block_added"}
var flt *neorpc.BlockFilter
if primary != nil || sinceIndex != nil || tillIndex != nil {
flt = &neorpc.BlockFilter{Primary: primary, Since: sinceIndex, Till: tillIndex}
params = append(params, flt)
}
rcvr := notificationReceiver{
typ: neorpc.BlockEventID,
params := []interface{}{"block_added"}
if flt != nil {
params = append(params, *flt)
}
r := &naiveReceiver{
eventID: neorpc.BlockEventID,
filter: flt,
ch: c.Notifications,
}
return c.performSubscription(params, r)
}
// ReceiveBlocks registers provided channel as a receiver for the new block events.
// Events can be filtered by the given BlockFilter, nil value doesn't add any filter.
// The receiver channel must be properly read and drained after usage in order not
// to block other notification receivers. If multiple subscriptions share the same
// receiver channel, then matching notification is only sent once per channel. The
// receiver channel will be closed by the WSClient immediately after MissedEvent is
// received from the server; no unsubscription is performed in this case, so it's the
// user responsibility to unsubscribe.
func (c *WSClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []interface{}{"block_added"}
if flt != nil {
params = append(params, *flt)
}
r := &blockReceiver{
filter: flt,
ch: rcvrCh,
ch: rcvr,
}
return c.performSubscription(params, rcvr)
return c.performSubscription(params, r)
}
// SubscribeForNewTransactions adds subscription for new transaction events to
// this instance of the client. It can be filtered by the sender and/or the signer, nil
// value is treated as missing filter.
//
// Deprecated: please, use SubscribeForNewTransactionsWithChan. This method will be removed in future versions.
// Deprecated: please, use ReceiveTransactions. This method will be removed in future versions.
func (c *WSClient) SubscribeForNewTransactions(sender *util.Uint160, signer *util.Uint160) (string, error) {
return c.SubscribeForNewTransactionsWithChan(sender, signer, c.Notifications)
}
// SubscribeForNewTransactionsWithChan registers provided channel as a receiver
// for new transaction events. Events can be filtered by the sender and/or the
// signer, nil value is treated as missing filter. If the receiver channel is nil,
// then the default Notifications channel will be used. The receiver channel must be
// properly read and drained after usage in order not to block other notification
// receivers.
func (c *WSClient) SubscribeForNewTransactionsWithChan(sender *util.Uint160, signer *util.Uint160, rcvrCh chan<- Notification) (string, error) {
if rcvrCh == nil {
rcvrCh = c.Notifications
}
params := []interface{}{"transaction_added"}
var flt *neorpc.TxFilter
if sender != nil || signer != nil {
flt = &neorpc.TxFilter{Sender: sender, Signer: signer}
}
params := []interface{}{"transaction_added"}
if flt != nil {
params = append(params, *flt)
}
rcvr := notificationReceiver{
typ: neorpc.TransactionEventID,
filter: flt,
ch: rcvrCh,
r := &naiveReceiver{
eventID: neorpc.TransactionEventID,
filter: flt,
ch: c.Notifications,
}
return c.performSubscription(params, rcvr)
return c.performSubscription(params, r)
}
// ReceiveTransactions registers provided channel as a receiver for new transaction
// events. Events can be filtered by the given TxFilter, nil value doesn't add any
// filter. The receiver channel must be properly read and drained after usage in
// order not to block other notification receivers. If multiple subscriptions share
// the same receiver channel, then matching notification is only sent once per channel.
// The receiver channel will be closed by the WSClient immediately after MissedEvent is
// received from the server; no unsubscription is performed in this case, so it's the
// user responsibility to unsubscribe.
func (c *WSClient) ReceiveTransactions(flt *neorpc.TxFilter, rcvr chan<- *transaction.Transaction) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []interface{}{"transaction_added"}
if flt != nil {
params = append(params, *flt)
}
r := &txReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// SubscribeForExecutionNotifications adds subscription for notifications
@ -463,33 +731,45 @@ func (c *WSClient) SubscribeForNewTransactionsWithChan(sender *util.Uint160, sig
// filtered by the contract's hash (that emits notifications), nil value puts no such
// restrictions.
//
// Deprecated: please, use SubscribeForExecutionNotificationsWithChan. This method will be removed in future versions.
// Deprecated: please, use ReceiveExecutionNotifications. This method will be removed in future versions.
func (c *WSClient) SubscribeForExecutionNotifications(contract *util.Uint160, name *string) (string, error) {
return c.SubscribeForExecutionNotificationsWithChan(contract, name, c.Notifications)
}
// SubscribeForExecutionNotificationsWithChan registers provided channel as a
// receiver for execution events. Events can be filtered by the contract's hash
// (that emits notifications), nil value puts no such restrictions. If the
// receiver channel is nil, then the default Notifications channel will be used.
// The receiver channel must be properly read and drained after usage in order
// not to block other notification receivers.
func (c *WSClient) SubscribeForExecutionNotificationsWithChan(contract *util.Uint160, name *string, rcvrCh chan<- Notification) (string, error) {
if rcvrCh == nil {
rcvrCh = c.Notifications
}
params := []interface{}{"notification_from_execution"}
var flt *neorpc.NotificationFilter
if contract != nil || name != nil {
flt = &neorpc.NotificationFilter{Contract: contract, Name: name}
}
params := []interface{}{"notification_from_execution"}
if flt != nil {
params = append(params, *flt)
}
rcvr := notificationReceiver{
typ: neorpc.NotificationEventID,
filter: flt,
ch: rcvrCh,
r := &naiveReceiver{
eventID: neorpc.NotificationEventID,
filter: flt,
ch: c.Notifications,
}
return c.performSubscription(params, rcvr)
return c.performSubscription(params, r)
}
// ReceiveExecutionNotifications registers provided channel as a receiver for execution
// events. Events can be filtered by the given NotificationFilter, nil value doesn't add
// any filter. The receiver channel must be properly read and drained after usage in
// order not to block other notification receivers. If multiple subscriptions share the
// same receiver channel, then matching notification is only sent once per channel. The
// receiver channel will be closed by the WSClient immediately after MissedEvent is
// received from the server; no unsubscription is performed in this case, so it's the
// user responsibility to unsubscribe.
func (c *WSClient) ReceiveExecutionNotifications(flt *neorpc.NotificationFilter, rcvr chan<- *state.ContainedNotificationEvent) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []interface{}{"notification_from_execution"}
if flt != nil {
params = append(params, *flt)
}
r := &executionNotificationReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// SubscribeForTransactionExecutions adds subscription for application execution
@ -498,39 +778,56 @@ func (c *WSClient) SubscribeForExecutionNotificationsWithChan(contract *util.Uin
// transactions; it can also be filtered by script container hash.
// nil value means no filtering.
//
// Deprecated: please, use SubscribeForTransactionExecutionsWithChan. This method will be removed in future versions.
func (c *WSClient) SubscribeForTransactionExecutions(state *string, container *util.Uint256) (string, error) {
return c.SubscribeForTransactionExecutionsWithChan(state, container, c.Notifications)
}
// SubscribeForTransactionExecutionsWithChan registers provided channel as a
// receiver for application execution result events generated during transaction
// execution. Events can be filtered by state (HALT/FAULT) to check for successful
// or failing transactions; it can also be filtered by script container hash.
// nil value means no filtering. If the receiver channel is nil, then the default
// Notifications channel will be used. The receiver channel must be properly read
// and drained after usage in order not to block other notification receivers.
func (c *WSClient) SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- Notification) (string, error) {
if rcvrCh == nil {
rcvrCh = c.Notifications
// Deprecated: please, use ReceiveExecutions. This method will be removed in future versions.
func (c *WSClient) SubscribeForTransactionExecutions(vmState *string, container *util.Uint256) (string, error) {
var flt *neorpc.ExecutionFilter
if vmState != nil || container != nil {
flt = &neorpc.ExecutionFilter{State: vmState, Container: container}
}
params := []interface{}{"transaction_executed"}
var flt *neorpc.ExecutionFilter
if state != nil || container != nil {
if state != nil {
if *state != "HALT" && *state != "FAULT" {
if flt != nil {
if flt.State != nil {
if *flt.State != "HALT" && *flt.State != "FAULT" {
return "", errors.New("bad state parameter")
}
}
flt = &neorpc.ExecutionFilter{State: state, Container: container}
params = append(params, *flt)
}
rcvr := notificationReceiver{
typ: neorpc.ExecutionEventID,
filter: flt,
ch: rcvrCh,
r := &naiveReceiver{
eventID: neorpc.ExecutionEventID,
filter: flt,
ch: c.Notifications,
}
return c.performSubscription(params, rcvr)
return c.performSubscription(params, r)
}
// ReceiveExecutions registers provided channel as a receiver for
// application execution result events generated during transaction execution.
// Events can be filtered by the given ExecutionFilter, nil value doesn't add any filter.
// The receiver channel must be properly read and drained after usage in order not
// to block other notification receivers. If multiple subscriptions share the same
// receiver channel, then matching notification is only sent once per channel. The
// receiver channel will be closed by the WSClient immediately after MissedEvent is
// received from the server; no unsubscription is performed in this case, so it's the
// user responsibility to unsubscribe.
func (c *WSClient) ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []interface{}{"transaction_executed"}
if flt != nil {
if flt.State != nil {
if *flt.State != "HALT" && *flt.State != "FAULT" {
return "", errors.New("bad state parameter")
}
}
params = append(params, *flt)
}
r := &executionReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// SubscribeForNotaryRequests adds subscription for notary request payloads
@ -538,33 +835,47 @@ func (c *WSClient) SubscribeForTransactionExecutionsWithChan(state *string, cont
// request sender's hash, or main tx signer's hash, nil value puts no such
// restrictions.
//
// Deprecated: please, use SubscribeForNotaryRequestsWithChan. This method will be removed in future versions.
// Deprecated: please, use ReceiveNotaryRequests. This method will be removed in future versions.
func (c *WSClient) SubscribeForNotaryRequests(sender *util.Uint160, mainSigner *util.Uint160) (string, error) {
return c.SubscribeForNotaryRequestsWithChan(sender, mainSigner, c.Notifications)
}
// SubscribeForNotaryRequestsWithChan registers provided channel as a receiver
// for notary request payload addition or removal events. It can be filtered by
// request sender's hash, or main tx signer's hash, nil value puts no such
// restrictions. If the receiver channel is nil, then the default Notifications
// channel will be used. The receiver channel must be properly read and drained
// after usage in order not to block other notification receivers.
func (c *WSClient) SubscribeForNotaryRequestsWithChan(sender *util.Uint160, mainSigner *util.Uint160, rcvrCh chan<- Notification) (string, error) {
if rcvrCh == nil {
rcvrCh = c.Notifications
var flt *neorpc.TxFilter
if sender != nil || mainSigner != nil {
flt = &neorpc.TxFilter{Sender: sender, Signer: mainSigner}
}
params := []interface{}{"notary_request_event"}
var flt *neorpc.TxFilter
if sender != nil {
flt = &neorpc.TxFilter{Sender: sender, Signer: mainSigner}
if flt != nil {
params = append(params, *flt)
}
rcvr := notificationReceiver{
typ: neorpc.NotaryRequestEventID,
filter: flt,
ch: rcvrCh,
r := &naiveReceiver{
eventID: neorpc.NotaryRequestEventID,
filter: flt,
ch: c.Notifications,
}
return c.performSubscription(params, rcvr)
return c.performSubscription(params, r)
}
// ReceiveNotaryRequests registers provided channel as a receiver for notary request
// payload addition or removal events. Events can be filtered by the given TxFilter
// where sender corresponds to notary request sender (the second fallback transaction
// signer) and signer corresponds to main transaction signers. nil value doesn't add
// any filter. The receiver channel must be properly read and drained after usage in
// order not to block other notification receivers. If multiple subscriptions share
// the same receiver channel, then matching notification is only sent once per channel.
// The receiver channel will be closed by the WSClient immediately after MissedEvent
// is received from the server; no unsubscription is performed in this case, so it's the
// user responsibility to unsubscribe.
func (c *WSClient) ReceiveNotaryRequests(flt *neorpc.TxFilter, rcvr chan<- *result.NotaryRequestEvent) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []interface{}{"notary_request_event"}
if flt != nil {
params = append(params, *flt)
}
r := &notaryRequestReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// Unsubscribe removes subscription for the given event stream.
@ -578,18 +889,43 @@ func (c *WSClient) UnsubscribeAll() error {
defer c.subscriptionsLock.Unlock()
for id := range c.subscriptions {
var resp bool
if err := c.performRequest("unsubscribe", []interface{}{id}, &resp); err != nil {
err := c.removeSubscription(id)
if err != nil {
return err
}
if !resp {
return errors.New("unsubscribe method returned false result")
}
delete(c.subscriptions, id)
}
return nil
}
// removeSubscription is internal method that removes subscription with the given
// ID from the list of subscriptions and receivers. It must be performed under
// subscriptions lock.
func (c *WSClient) removeSubscription(id string) error {
var resp bool
if err := c.performRequest("unsubscribe", []interface{}{id}, &resp); err != nil {
return err
}
if !resp {
return errors.New("unsubscribe method returned false result")
}
rcvr := c.subscriptions[id]
ch := rcvr.Receiver()
ids := c.receivers[ch]
for i, rcvrID := range ids {
if rcvrID == id {
ids = append(ids[:i], ids[i+1:]...)
break
}
}
if len(ids) == 0 {
delete(c.receivers, ch)
} else {
c.receivers[ch] = ids
}
delete(c.subscriptions, id)
return nil
}
// setCloseErr is a thread-safe method setting closeErr in case if it's not yet set.
func (c *WSClient) setCloseErr(err error) {
c.closeErrLock.Lock()

View file

@ -15,11 +15,16 @@ import (
"github.com/gorilla/websocket"
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/services/rpcsrv/params"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)
@ -32,31 +37,26 @@ func TestWSClientClose(t *testing.T) {
}
func TestWSClientSubscription(t *testing.T) {
ch := make(chan Notification)
bCh := make(chan *block.Block)
txCh := make(chan *transaction.Transaction)
aerCh := make(chan *state.AppExecResult)
ntfCh := make(chan *state.ContainedNotificationEvent)
ntrCh := make(chan *result.NotaryRequestEvent)
var cases = map[string]func(*WSClient) (string, error){
"blocks": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForNewBlocksWithChan(nil, nil, nil, nil)
},
"blocks_with_custom_ch": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForNewBlocksWithChan(nil, nil, nil, ch)
return wsc.ReceiveBlocks(nil, bCh)
},
"transactions": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForNewTransactionsWithChan(nil, nil, nil)
},
"transactions_with_custom_ch": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForNewTransactionsWithChan(nil, nil, ch)
return wsc.ReceiveTransactions(nil, txCh)
},
"notifications": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForExecutionNotificationsWithChan(nil, nil, nil)
},
"notifications_with_custom_ch": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForExecutionNotificationsWithChan(nil, nil, ch)
return wsc.ReceiveExecutionNotifications(nil, ntfCh)
},
"executions": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForTransactionExecutionsWithChan(nil, nil, nil)
return wsc.ReceiveExecutions(nil, aerCh)
},
"executions_with_custom_ch": func(wsc *WSClient) (string, error) {
return wsc.SubscribeForTransactionExecutionsWithChan(nil, nil, ch)
"notary requests": func(wsc *WSClient) (string, error) {
return wsc.ReceiveNotaryRequests(nil, ntrCh)
},
}
t.Run("good", func(t *testing.T) {
@ -96,13 +96,13 @@ func TestWSClientUnsubscription(t *testing.T) {
var cases = map[string]responseCheck{
"good": {`{"jsonrpc": "2.0", "id": 1, "result": true}`, func(t *testing.T, wsc *WSClient) {
// We can't really subscribe using this stub server, so set up wsc internals.
wsc.subscriptions["0"] = notificationReceiver{}
wsc.subscriptions["0"] = &blockReceiver{}
err := wsc.Unsubscribe("0")
require.NoError(t, err)
}},
"all": {`{"jsonrpc": "2.0", "id": 1, "result": true}`, func(t *testing.T, wsc *WSClient) {
// We can't really subscribe using this stub server, so set up wsc internals.
wsc.subscriptions["0"] = notificationReceiver{}
wsc.subscriptions["0"] = &blockReceiver{}
err := wsc.UnsubscribeAll()
require.NoError(t, err)
require.Equal(t, 0, len(wsc.subscriptions))
@ -113,13 +113,13 @@ func TestWSClientUnsubscription(t *testing.T) {
}},
"error returned": {`{"jsonrpc": "2.0", "id": 1, "error":{"code":-32602,"message":"Invalid Params"}}`, func(t *testing.T, wsc *WSClient) {
// We can't really subscribe using this stub server, so set up wsc internals.
wsc.subscriptions["0"] = notificationReceiver{}
wsc.subscriptions["0"] = &blockReceiver{}
err := wsc.Unsubscribe("0")
require.Error(t, err)
}},
"false returned": {`{"jsonrpc": "2.0", "id": 1, "result": false}`, func(t *testing.T, wsc *WSClient) {
// We can't really subscribe using this stub server, so set up wsc internals.
wsc.subscriptions["0"] = notificationReceiver{}
wsc.subscriptions["0"] = &blockReceiver{}
err := wsc.Unsubscribe("0")
require.Error(t, err)
}},
@ -144,7 +144,7 @@ func TestWSClientEvents(t *testing.T) {
`{"jsonrpc":"2.0","method":"notification_from_execution","params":[{"container":"0xe1cd5e57e721d2a2e05fb1f08721b12057b25ab1dd7fd0f33ee1639932fdfad7","contract":"0x1b4357bff5a01bdf2a6581247cf9ed1e24629176","eventname":"contract call","state":{"type":"Array","value":[{"type":"ByteString","value":"dHJhbnNmZXI="},{"type":"Array","value":[{"type":"ByteString","value":"dpFiJB7t+XwkgWUq3xug9b9XQxs="},{"type":"ByteString","value":"MW6FEDkBnTnfwsN9bD/uGf1YCYc="},{"type":"Integer","value":"1000"}]}]}}]}`,
`{"jsonrpc":"2.0","method":"transaction_executed","params":[{"container":"0xf97a72b7722c109f909a8bc16c22368c5023d85828b09b127b237aace33cf099","trigger":"Application","vmstate":"HALT","gasconsumed":"6042610","stack":[],"notifications":[{"contract":"0xe65ff7b3a02d207b584a5c27057d4e9862ef01da","eventname":"contract call","state":{"type":"Array","value":[{"type":"ByteString","value":"dHJhbnNmZXI="},{"type":"Array","value":[{"type":"ByteString","value":"MW6FEDkBnTnfwsN9bD/uGf1YCYc="},{"type":"ByteString","value":"IHKCdK+vw29DoHHTKM+j5inZy7A="},{"type":"Integer","value":"123"}]}]}},{"contract":"0xe65ff7b3a02d207b584a5c27057d4e9862ef01da","eventname":"transfer","state":{"type":"Array","value":[{"type":"ByteString","value":"MW6FEDkBnTnfwsN9bD/uGf1YCYc="},{"type":"ByteString","value":"IHKCdK+vw29DoHHTKM+j5inZy7A="},{"type":"Integer","value":"123"}]}}]}]}`,
fmt.Sprintf(`{"jsonrpc":"2.0","method":"block_added","params":[%s]}`, b1Verbose),
`{"jsonrpc":"2.0","method":"event_missed","params":[]}`,
`{"jsonrpc":"2.0","method":"event_missed","params":[]}`, // the last one, will trigger receiver channels closing.
}
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/ws" && req.Method == "GET" {
@ -163,109 +163,123 @@ func TestWSClientEvents(t *testing.T) {
return
}
}))
wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{})
require.NoError(t, err)
wsc.getNextRequestID = getTestRequestID
wsc.cacheLock.Lock()
wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually.
wsc.cache.network = netmode.UnitTestNet
wsc.cacheLock.Unlock()
t.Run("default ntf channel", func(t *testing.T) {
wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{})
require.NoError(t, err)
wsc.getNextRequestID = getTestRequestID
wsc.cacheLock.Lock()
wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually.
wsc.cache.network = netmode.UnitTestNet
wsc.cacheLock.Unlock()
// Our server mock is restricted, so perform subscriptions manually with default notifications channel.
wsc.subscriptionsLock.Lock()
wsc.subscriptions["0"] = notificationReceiver{typ: neorpc.BlockEventID, ch: wsc.Notifications}
wsc.subscriptions["1"] = notificationReceiver{typ: neorpc.ExecutionEventID, ch: wsc.Notifications}
wsc.subscriptions["2"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: wsc.Notifications}
// MissedEvent must be delivered without subscription.
wsc.subscriptionsLock.Unlock()
for range events {
select {
case _, ok = <-wsc.Notifications:
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
require.True(t, ok)
}
// Our server mock is restricted, so perform subscriptions manually with default notifications channel.
bCh1 := make(chan *block.Block)
bCh2 := make(chan *block.Block)
aerCh1 := make(chan *state.AppExecResult)
aerCh2 := make(chan *state.AppExecResult)
aerCh3 := make(chan *state.AppExecResult)
ntfCh := make(chan *state.ContainedNotificationEvent)
halt := "HALT"
fault := "FAULT"
wsc.subscriptionsLock.Lock()
wsc.subscriptions["0"] = &blockReceiver{ch: bCh1}
wsc.receivers[chan<- *block.Block(bCh1)] = []string{"0"}
wsc.subscriptions["1"] = &blockReceiver{ch: bCh2} // two different channels subscribed for same notifications
wsc.receivers[chan<- *block.Block(bCh2)] = []string{"1"}
wsc.subscriptions["2"] = &executionNotificationReceiver{ch: ntfCh}
wsc.subscriptions["3"] = &executionNotificationReceiver{ch: ntfCh} // check duplicating subscriptions
wsc.receivers[chan<- *state.ContainedNotificationEvent(ntfCh)] = []string{"2", "3"}
wsc.subscriptions["4"] = &executionReceiver{ch: aerCh1}
wsc.receivers[chan<- *state.AppExecResult(aerCh1)] = []string{"4"}
wsc.subscriptions["5"] = &executionReceiver{filter: &neorpc.ExecutionFilter{State: &halt}, ch: aerCh2}
wsc.receivers[chan<- *state.AppExecResult(aerCh2)] = []string{"5"}
wsc.subscriptions["6"] = &executionReceiver{filter: &neorpc.ExecutionFilter{State: &fault}, ch: aerCh3}
wsc.receivers[chan<- *state.AppExecResult(aerCh3)] = []string{"6"}
// MissedEvent must close the channels above.
wsc.subscriptions["7"] = &naiveReceiver{eventID: neorpc.BlockEventID, ch: wsc.Notifications}
wsc.subscriptions["8"] = &naiveReceiver{eventID: neorpc.BlockEventID, ch: wsc.Notifications} // check duplicating subscriptions
wsc.subscriptions["9"] = &naiveReceiver{eventID: neorpc.ExecutionEventID, ch: wsc.Notifications} // check different events
wsc.receivers[wsc.Notifications] = []string{"7", "8", "9"}
wsc.subscriptionsLock.Unlock()
var (
b1Cnt, b2Cnt int
aer1Cnt, aer2Cnt, aer3Cnt int
ntfCnt int
defaultCount int
expectedb1Cnt, expectedb2Cnt = 1, 1 // single Block event
expectedaer1Cnt, expectedaer2Cnt, expectedaer3Cnt = 2, 2, 0 // two HALTED AERs
expectedntfCnt = 1 // single notification event
expectedDefaultCnt = 1 + 2 + 1 // single Block event + two AERs + missed event
aer *state.AppExecResult
)
for b1Cnt+b2Cnt+
aer1Cnt+aer2Cnt+aer3Cnt+
ntfCnt+
defaultCount !=
expectedb1Cnt+expectedb2Cnt+
expectedaer1Cnt+expectedaer2Cnt+expectedaer3Cnt+
expectedntfCnt+
expectedDefaultCnt {
select {
case _, ok = <-bCh1:
if ok {
b1Cnt++
}
case _, ok = <-bCh2:
if ok {
b2Cnt++
}
case _, ok = <-aerCh1:
if ok {
aer1Cnt++
}
case aer, ok = <-aerCh2:
if ok {
require.Equal(t, vmstate.Halt, aer.VMState)
aer2Cnt++
}
case _, ok = <-aerCh3:
if ok {
aer3Cnt++
}
case _, ok = <-ntfCh:
if ok {
ntfCnt++
}
case _, ok = <-wsc.Notifications:
if ok {
defaultCount++
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
// Connection closed by server.
require.False(t, ok)
})
t.Run("multiple ntf channels", func(t *testing.T) {
wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{})
require.NoError(t, err)
wsc.getNextRequestID = getTestRequestID
wsc.cacheLock.Lock()
wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually.
wsc.cache.network = netmode.UnitTestNet
wsc.cacheLock.Unlock()
}
assert.Equal(t, expectedb1Cnt, b1Cnt)
assert.Equal(t, expectedb2Cnt, b2Cnt)
assert.Equal(t, expectedaer1Cnt, aer1Cnt)
assert.Equal(t, expectedaer2Cnt, aer2Cnt)
assert.Equal(t, expectedaer3Cnt, aer3Cnt)
assert.Equal(t, expectedntfCnt, ntfCnt)
assert.Equal(t, expectedDefaultCnt, defaultCount)
// Our server mock is restricted, so perform subscriptions manually with default notifications channel.
ch1 := make(chan Notification)
ch2 := make(chan Notification)
ch3 := make(chan Notification)
halt := "HALT"
fault := "FAULT"
wsc.subscriptionsLock.Lock()
wsc.subscriptions["0"] = notificationReceiver{typ: neorpc.BlockEventID, ch: wsc.Notifications}
wsc.subscriptions["1"] = notificationReceiver{typ: neorpc.ExecutionEventID, ch: wsc.Notifications}
wsc.subscriptions["2"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: wsc.Notifications}
wsc.subscriptions["3"] = notificationReceiver{typ: neorpc.BlockEventID, ch: ch1}
wsc.subscriptions["4"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2}
wsc.subscriptions["5"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2} // check duplicating subscriptions
wsc.subscriptions["6"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: &halt}, ch: ch2}
wsc.subscriptions["7"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: &fault}, ch: ch3}
// MissedEvent must be delivered without subscription.
wsc.subscriptionsLock.Unlock()
var (
defaultChCnt int
ch1Cnt int
ch2Cnt int
ch3Cnt int
expectedDefaultCnCount = len(events)
expectedCh1Cnt = 1 + 1 // Block event + Missed event
expectedCh2Cnt = 1 + 2 + 1 // Notification event + 2 Execution events + Missed event
expectedCh3Cnt = 1 // Missed event
ntf Notification
)
for i := 0; i < expectedDefaultCnCount+expectedCh1Cnt+expectedCh2Cnt+expectedCh3Cnt; i++ {
select {
case ntf, ok = <-wsc.Notifications:
defaultChCnt++
case ntf, ok = <-ch1:
require.True(t, ntf.Type == neorpc.BlockEventID || ntf.Type == neorpc.MissedEventID, ntf.Type)
ch1Cnt++
case ntf, ok = <-ch2:
require.True(t, ntf.Type == neorpc.NotificationEventID || ntf.Type == neorpc.MissedEventID || ntf.Type == neorpc.ExecutionEventID)
ch2Cnt++
case ntf, ok = <-ch3:
require.True(t, ntf.Type == neorpc.MissedEventID)
ch3Cnt++
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
require.True(t, ok)
}
select {
case _, ok = <-wsc.Notifications:
case _, ok = <-ch1:
case _, ok = <-ch2:
case _, ok = <-ch3:
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
// Connection closed by server.
require.False(t, ok)
require.Equal(t, expectedDefaultCnCount, defaultChCnt)
require.Equal(t, expectedCh1Cnt, ch1Cnt)
require.Equal(t, expectedCh2Cnt, ch2Cnt)
require.Equal(t, expectedCh3Cnt, ch3Cnt)
})
// Channels must be closed by server
_, ok = <-bCh1
require.False(t, ok)
_, ok = <-bCh2
require.False(t, ok)
_, ok = <-aerCh1
require.False(t, ok)
_, ok = <-aerCh2
require.False(t, ok)
_, ok = <-aerCh3
require.False(t, ok)
_, ok = <-ntfCh
require.False(t, ok)
_, ok = <-ntfCh
require.False(t, ok)
}
func TestWSExecutionVMStateCheck(t *testing.T) {
@ -276,12 +290,16 @@ func TestWSExecutionVMStateCheck(t *testing.T) {
wsc.getNextRequestID = getTestRequestID
require.NoError(t, wsc.Init())
filter := "NONE"
_, err = wsc.SubscribeForTransactionExecutionsWithChan(&filter, nil, nil)
_, err = wsc.ReceiveExecutions(&neorpc.ExecutionFilter{State: &filter}, make(chan *state.AppExecResult))
require.Error(t, err)
wsc.Close()
}
func TestWSFilteredSubscriptions(t *testing.T) {
bCh := make(chan *block.Block)
txCh := make(chan *transaction.Transaction)
aerCh := make(chan *state.AppExecResult)
ntfCh := make(chan *state.ContainedNotificationEvent)
var cases = []struct {
name string
clientCode func(*testing.T, *WSClient)
@ -290,7 +308,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"blocks primary",
func(t *testing.T, wsc *WSClient) {
primary := 3
_, err := wsc.SubscribeForNewBlocksWithChan(&primary, nil, nil, nil)
_, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{Primary: &primary}, bCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -305,7 +323,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"blocks since",
func(t *testing.T, wsc *WSClient) {
var since uint32 = 3
_, err := wsc.SubscribeForNewBlocksWithChan(nil, &since, nil, nil)
_, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -320,7 +338,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"blocks till",
func(t *testing.T, wsc *WSClient) {
var till uint32 = 3
_, err := wsc.SubscribeForNewBlocksWithChan(nil, nil, &till, nil)
_, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{Till: &till}, bCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -339,7 +357,11 @@ func TestWSFilteredSubscriptions(t *testing.T) {
primary = 2
till uint32 = 5
)
_, err := wsc.SubscribeForNewBlocksWithChan(&primary, &since, &till, nil)
_, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{
Primary: &primary,
Since: &since,
Till: &till,
}, bCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -354,7 +376,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"transactions sender",
func(t *testing.T, wsc *WSClient) {
sender := util.Uint160{1, 2, 3, 4, 5}
_, err := wsc.SubscribeForNewTransactionsWithChan(&sender, nil, nil)
_, err := wsc.ReceiveTransactions(&neorpc.TxFilter{Sender: &sender}, txCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -368,7 +390,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"transactions signer",
func(t *testing.T, wsc *WSClient) {
signer := util.Uint160{0, 42}
_, err := wsc.SubscribeForNewTransactionsWithChan(nil, &signer, nil)
_, err := wsc.ReceiveTransactions(&neorpc.TxFilter{Signer: &signer}, txCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -383,7 +405,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
func(t *testing.T, wsc *WSClient) {
sender := util.Uint160{1, 2, 3, 4, 5}
signer := util.Uint160{0, 42}
_, err := wsc.SubscribeForNewTransactionsWithChan(&sender, &signer, nil)
_, err := wsc.ReceiveTransactions(&neorpc.TxFilter{Sender: &sender, Signer: &signer}, txCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -397,7 +419,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"notifications contract hash",
func(t *testing.T, wsc *WSClient) {
contract := util.Uint160{1, 2, 3, 4, 5}
_, err := wsc.SubscribeForExecutionNotificationsWithChan(&contract, nil, nil)
_, err := wsc.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, ntfCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -411,7 +433,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"notifications name",
func(t *testing.T, wsc *WSClient) {
name := "my_pretty_notification"
_, err := wsc.SubscribeForExecutionNotificationsWithChan(nil, &name, nil)
_, err := wsc.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Name: &name}, ntfCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -426,7 +448,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
func(t *testing.T, wsc *WSClient) {
contract := util.Uint160{1, 2, 3, 4, 5}
name := "my_pretty_notification"
_, err := wsc.SubscribeForExecutionNotificationsWithChan(&contract, &name, nil)
_, err := wsc.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract, Name: &name}, ntfCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -440,7 +462,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"executions state",
func(t *testing.T, wsc *WSClient) {
state := "FAULT"
_, err := wsc.SubscribeForTransactionExecutionsWithChan(&state, nil, nil)
_, err := wsc.ReceiveExecutions(&neorpc.ExecutionFilter{State: &state}, aerCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -454,7 +476,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
{"executions container",
func(t *testing.T, wsc *WSClient) {
container := util.Uint256{1, 2, 3}
_, err := wsc.SubscribeForTransactionExecutionsWithChan(nil, &container, nil)
_, err := wsc.ReceiveExecutions(&neorpc.ExecutionFilter{Container: &container}, aerCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
@ -469,7 +491,7 @@ func TestWSFilteredSubscriptions(t *testing.T) {
func(t *testing.T, wsc *WSClient) {
state := "FAULT"
container := util.Uint256{1, 2, 3}
_, err := wsc.SubscribeForTransactionExecutionsWithChan(&state, &container, nil)
_, err := wsc.ReceiveExecutions(&neorpc.ExecutionFilter{State: &state, Container: &container}, aerCh)
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {