From 0a5905390c4bc3cb2dcd7b8d7d9616b2f43223fb Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Tue, 25 Oct 2022 15:11:24 +0300 Subject: [PATCH] rpc: refactor WSClient subscriptions API Make it more specific, close #2756. --- pkg/rpcclient/actor/waiter.go | 64 +-- pkg/rpcclient/actor/waiter_test.go | 23 +- pkg/rpcclient/wsclient.go | 616 ++++++++++++++++++++++------- pkg/rpcclient/wsclient_test.go | 284 +++++++------ 4 files changed, 672 insertions(+), 315 deletions(-) diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go index 38685bc90..4eae26f81 100644 --- a/pkg/rpcclient/actor/waiter.go +++ b/pkg/rpcclient/actor/waiter.go @@ -6,10 +6,10 @@ import ( "fmt" "time" + "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" - "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/util" ) @@ -30,6 +30,9 @@ var ( // ErrAwaitingNotSupported is returned from Wait method if Waiter instance // doesn't support transaction awaiting. ErrAwaitingNotSupported = errors.New("awaiting not supported") + // ErrMissedEvent is returned when RPCEventWaiter closes receiver channel + // which happens if missed event was received from the RPC server. + ErrMissedEvent = errors.New("some event was missed") ) type ( @@ -65,8 +68,8 @@ type ( RPCEventWaiter interface { RPCPollingWaiter - SubscribeForNewBlocksWithChan(primary *int, since *uint32, till *uint32, rcvrCh chan<- rpcclient.Notification) (string, error) - SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- rpcclient.Notification) (string, error) + ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) + ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error) Unsubscribe(id string) error } ) @@ -223,22 +226,27 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui } } }() - rcvr := make(chan rpcclient.Notification) + bRcvr := make(chan *block.Block) + aerRcvr := make(chan *state.AppExecResult) defer func() { drainLoop: - // Drain rcvr to avoid other notification receivers blocking. + // Drain receivers to avoid other notification receivers blocking. for { select { - case <-rcvr: + case <-bRcvr: + case <-aerRcvr: default: break drainLoop } } - close(rcvr) + if wsWaitErr == nil || !errors.Is(wsWaitErr, ErrMissedEvent) { + close(bRcvr) + close(aerRcvr) + } }() // Execution event follows the block event, thus wait until the block next to the VUB to be sure. since := vub + 1 - blocksID, err := w.ws.SubscribeForNewBlocksWithChan(nil, &since, nil, rcvr) + blocksID, err := w.ws.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bRcvr) if err != nil { wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err) return @@ -256,7 +264,7 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui } }() for _, h := range hashes { - txsID, err := w.ws.SubscribeForTransactionExecutionsWithChan(nil, &h, rcvr) + txsID, err := w.ws.ReceiveExecutions(&neorpc.ExecutionFilter{Container: &h}, aerRcvr) if err != nil { wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err) return @@ -275,27 +283,25 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui }() } - for { - select { - case ntf := <-rcvr: - switch ntf.Type { - case neorpc.BlockEventID: - waitErr = ErrTxNotAccepted - return - case neorpc.ExecutionEventID: - res = ntf.Value.(*state.AppExecResult) - return - case neorpc.MissedEventID: - // We're toast, retry with non-ws client. - wsWaitErr = errors.New("some event was missed") - return - } - case <-w.ws.Context().Done(): - waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err()) - return - case <-ctx.Done(): - waitErr = fmt.Errorf("%w: %v", ErrContextDone, ctx.Err()) + select { + case _, ok := <-bRcvr: + if !ok { + // We're toast, retry with non-ws client. + wsWaitErr = ErrMissedEvent return } + waitErr = ErrTxNotAccepted + case aer, ok := <-aerRcvr: + if !ok { + // We're toast, retry with non-ws client. + wsWaitErr = ErrMissedEvent + return + } + res = aer + case <-w.ws.Context().Done(): + waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err()) + case <-ctx.Done(): + waitErr = fmt.Errorf("%w: %v", ErrContextDone, ctx.Err()) } + return } diff --git a/pkg/rpcclient/actor/waiter_test.go b/pkg/rpcclient/actor/waiter_test.go index 03a516ce4..881be6a7f 100644 --- a/pkg/rpcclient/actor/waiter_test.go +++ b/pkg/rpcclient/actor/waiter_test.go @@ -11,7 +11,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" - "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/stretchr/testify/require" ) @@ -20,20 +19,20 @@ type AwaitableRPCClient struct { RPCClient chLock sync.RWMutex - subBlockCh chan<- rpcclient.Notification - subTxCh chan<- rpcclient.Notification + subBlockCh chan<- *block.Block + subTxCh chan<- *state.AppExecResult } -func (c *AwaitableRPCClient) SubscribeForNewBlocksWithChan(primary *int, since *uint32, till *uint32, rcvrCh chan<- rpcclient.Notification) (string, error) { +func (c *AwaitableRPCClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) { c.chLock.Lock() defer c.chLock.Unlock() - c.subBlockCh = rcvrCh + c.subBlockCh = rcvr return "1", nil } -func (c *AwaitableRPCClient) SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- rpcclient.Notification) (string, error) { +func (c *AwaitableRPCClient) ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error) { c.chLock.Lock() defer c.chLock.Unlock() - c.subTxCh = rcvrCh + c.subTxCh = rcvr return "2", nil } func (c *AwaitableRPCClient) Unsubscribe(id string) error { return nil } @@ -163,10 +162,7 @@ func TestWSWaiter_Wait(t *testing.T) { check(t, func() { c.chLock.RLock() defer c.chLock.RUnlock() - c.subBlockCh <- rpcclient.Notification{ - Type: neorpc.ExecutionEventID, - Value: expected, - } + c.subTxCh <- expected }) // Missing AER after VUB. @@ -178,9 +174,6 @@ func TestWSWaiter_Wait(t *testing.T) { check(t, func() { c.chLock.RLock() defer c.chLock.RUnlock() - c.subBlockCh <- rpcclient.Notification{ - Type: neorpc.BlockEventID, - Value: &block.Block{}, - } + c.subBlockCh <- &block.Block{} }) } diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index a2ade71d5..236bd9fa0 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -34,6 +34,11 @@ type WSClient struct { // WSClient to block even regular requests. This channel is not buffered. // In case of protocol error or upon connection closure, this channel will // be closed, so make sure to handle this. + // + // Deprecated: please, use custom channels with ReceiveBlocks, ReceiveTransactions, + // ReceiveExecutionNotifications, ReceiveExecutions, ReceiveNotaryRequests + // methods to subscribe for notifications. This field will be removed in future + // versions. Notifications chan Notification ws *websocket.Conn @@ -47,29 +52,261 @@ type WSClient struct { subscriptionsLock sync.RWMutex subscriptions map[string]notificationReceiver + // receivers is a mapping from receiver channel to a set of corresponding subscription IDs. + // It must be accessed with subscriptionsLock taken. Its keys must be used to deliver + // notifications, if channel is not in the receivers list and corresponding subscription + // still exists, notification must not be sent. + receivers map[interface{}][]string respLock sync.RWMutex respChannels map[uint64]chan *neorpc.Response } -// notificationReceiver is a server events receiver. It stores desired notifications ID -// and filter and a channel used to receive matching notifications. -type notificationReceiver struct { - typ neorpc.EventID - filter interface{} - ch chan<- Notification +// notificationReceiver is an interface aimed to provide WS subscriber functionality +// for different types of subscriptions. +type notificationReceiver interface { + // Comparator provides notification filtering functionality. + rpcevent.Comparator + // Receiver returns notification receiver channel. + Receiver() interface{} + // TrySend checks whether notification passes receiver filter and sends it + // to the underlying channel if so. + TrySend(ntf Notification) bool + // Close closes underlying receiver channel. + Close() } -// EventID implements neorpc.Comparator interface and returns notification ID. -func (r notificationReceiver) EventID() neorpc.EventID { - return r.typ +// blockReceiver stores information about block events subscriber. +type blockReceiver struct { + filter *neorpc.BlockFilter + ch chan<- *block.Block } -// Filter implements neorpc.Comparator interface and returns notification filter. -func (r notificationReceiver) Filter() interface{} { +// EventID implements neorpc.Comparator interface. +func (r *blockReceiver) EventID() neorpc.EventID { + return neorpc.BlockEventID +} + +// Filter implements neorpc.Comparator interface. +func (r *blockReceiver) Filter() interface{} { + if r.filter == nil { + return nil + } + return *r.filter +} + +// Receiver implements notificationReceiver interface. +func (r *blockReceiver) Receiver() interface{} { + return r.ch +} + +// TrySend implements notificationReceiver interface. +func (r *blockReceiver) TrySend(ntf Notification) bool { + if rpcevent.Matches(r, ntf) { + r.ch <- ntf.Value.(*block.Block) + return true + } + return false +} + +// Close implements notificationReceiver interface. +func (r *blockReceiver) Close() { + close(r.ch) +} + +// txReceiver stores information about transaction events subscriber. +type txReceiver struct { + filter *neorpc.TxFilter + ch chan<- *transaction.Transaction +} + +// EventID implements neorpc.Comparator interface. +func (r *txReceiver) EventID() neorpc.EventID { + return neorpc.TransactionEventID +} + +// Filter implements neorpc.Comparator interface. +func (r *txReceiver) Filter() interface{} { + if r.filter == nil { + return nil + } + return *r.filter +} + +// Receiver implements notificationReceiver interface. +func (r *txReceiver) Receiver() interface{} { + return r.ch +} + +// TrySend implements notificationReceiver interface. +func (r *txReceiver) TrySend(ntf Notification) bool { + if rpcevent.Matches(r, ntf) { + r.ch <- ntf.Value.(*transaction.Transaction) + return true + } + return false +} + +// Close implements notificationReceiver interface. +func (r *txReceiver) Close() { + close(r.ch) +} + +// executionNotificationReceiver stores information about execution notifications subscriber. +type executionNotificationReceiver struct { + filter *neorpc.NotificationFilter + ch chan<- *state.ContainedNotificationEvent +} + +// EventID implements neorpc.Comparator interface. +func (r *executionNotificationReceiver) EventID() neorpc.EventID { + return neorpc.NotificationEventID +} + +// Filter implements neorpc.Comparator interface. +func (r *executionNotificationReceiver) Filter() interface{} { + if r.filter == nil { + return nil + } + return *r.filter +} + +// Receiver implements notificationReceiver interface. +func (r *executionNotificationReceiver) Receiver() interface{} { + return r.ch +} + +// TrySend implements notificationReceiver interface. +func (r *executionNotificationReceiver) TrySend(ntf Notification) bool { + if rpcevent.Matches(r, ntf) { + r.ch <- ntf.Value.(*state.ContainedNotificationEvent) + return true + } + return false +} + +// Close implements notificationReceiver interface. +func (r *executionNotificationReceiver) Close() { + close(r.ch) +} + +// executionReceiver stores information about application execution results subscriber. +type executionReceiver struct { + filter *neorpc.ExecutionFilter + ch chan<- *state.AppExecResult +} + +// EventID implements neorpc.Comparator interface. +func (r *executionReceiver) EventID() neorpc.EventID { + return neorpc.ExecutionEventID +} + +// Filter implements neorpc.Comparator interface. +func (r *executionReceiver) Filter() interface{} { + if r.filter == nil { + return nil + } + return *r.filter +} + +// Receiver implements notificationReceiver interface. +func (r *executionReceiver) Receiver() interface{} { + return r.ch +} + +// TrySend implements notificationReceiver interface. +func (r *executionReceiver) TrySend(ntf Notification) bool { + if rpcevent.Matches(r, ntf) { + r.ch <- ntf.Value.(*state.AppExecResult) + return true + } + return false +} + +// Close implements notificationReceiver interface. +func (r *executionReceiver) Close() { + close(r.ch) +} + +// notaryRequestReceiver stores information about notary requests subscriber. +type notaryRequestReceiver struct { + filter *neorpc.TxFilter + ch chan<- *result.NotaryRequestEvent +} + +// EventID implements neorpc.Comparator interface. +func (r *notaryRequestReceiver) EventID() neorpc.EventID { + return neorpc.NotaryRequestEventID +} + +// Filter implements neorpc.Comparator interface. +func (r *notaryRequestReceiver) Filter() interface{} { + if r.filter == nil { + return nil + } + return *r.filter +} + +// Receiver implements notificationReceiver interface. +func (r *notaryRequestReceiver) Receiver() interface{} { + return r.ch +} + +// TrySend implements notificationReceiver interface. +func (r *notaryRequestReceiver) TrySend(ntf Notification) bool { + if rpcevent.Matches(r, ntf) { + r.ch <- ntf.Value.(*result.NotaryRequestEvent) + return true + } + return false +} + +// Close implements notificationReceiver interface. +func (r *notaryRequestReceiver) Close() { + close(r.ch) +} + +// naiveReceiver is a structure leaved for deprecated single channel based notifications +// delivering. +// +// Deprecated: this receiver must be removed after outdated subscriptions API removal. +type naiveReceiver struct { + eventID neorpc.EventID + filter interface{} + ch chan<- Notification +} + +// EventID implements neorpc.Comparator interface. +func (r *naiveReceiver) EventID() neorpc.EventID { + return r.eventID +} + +// Filter implements neorpc.Comparator interface. +func (r *naiveReceiver) Filter() interface{} { return r.filter } +// Receiver implements notificationReceiver interface. +func (r *naiveReceiver) Receiver() interface{} { + return r.ch +} + +// TrySend implements notificationReceiver interface. +func (r *naiveReceiver) TrySend(ntf Notification) bool { + if rpcevent.Matches(r, ntf) { + r.ch <- ntf + return true + } + return false +} + +// Close implements notificationReceiver interface. +func (r *naiveReceiver) Close() { + r.ch <- Notification{ + Type: neorpc.MissedEventID, // backwards-compatible behaviour + } +} + // Notification represents a server-generated notification for client subscriptions. // Value can be one of *block.Block, *state.AppExecResult, *state.ContainedNotificationEvent // *transaction.Transaction or *subscriptions.NotaryRequestEvent based on Type. @@ -111,6 +348,9 @@ const ( wsWriteLimit = wsPingPeriod / 2 ) +// ErrNilNotificationReceiver is returned when notification receiver channel is nil. +var ErrNilNotificationReceiver = errors.New("nil notification receiver") + // errConnClosedByUser is a WSClient error used iff the user calls (*WSClient).Close method by himself. var errConnClosedByUser = errors.New("connection closed by user") @@ -138,6 +378,7 @@ func NewWS(ctx context.Context, endpoint string, opts Options) (*WSClient, error respChannels: make(map[uint64]chan *neorpc.Response), requests: make(chan *neorpc.Request), subscriptions: make(map[string]notificationReceiver), + receivers: make(map[interface{}][]string), } err = initClient(ctx, &wsc.Client, endpoint, opts) @@ -237,13 +478,22 @@ readloop: break readloop } } - ok := make(map[chan<- Notification]bool) + if event == neorpc.MissedEventID { + c.subscriptionsLock.Lock() + for rcvr, ids := range c.receivers { + c.subscriptions[ids[0]].Close() + delete(c.receivers, rcvr) + } + c.subscriptionsLock.Unlock() + continue readloop + } c.subscriptionsLock.RLock() - for _, rcvr := range c.subscriptions { - ntf := Notification{Type: event, Value: val} - if (rpcevent.Matches(rcvr, ntf) || event == neorpc.MissedEventID /*missed event must be delivered to each receiver*/) && !ok[rcvr.ch] { - ok[rcvr.ch] = true // strictly one notification per channel - rcvr.ch <- ntf // this will block other receivers + ntf := Notification{Type: event, Value: val} + for _, ids := range c.receivers { + for _, id := range ids { + if c.subscriptions[id].TrySend(ntf) { + break // strictly one notification per channel + } } } c.subscriptionsLock.RUnlock() @@ -370,92 +620,110 @@ func (c *WSClient) performSubscription(params []interface{}, rcvr notificationRe defer c.subscriptionsLock.Unlock() c.subscriptions[resp] = rcvr + ch := rcvr.Receiver() + c.receivers[ch] = append(c.receivers[ch], resp) return resp, nil } func (c *WSClient) performUnsubscription(id string) error { - var resp bool - c.subscriptionsLock.Lock() defer c.subscriptionsLock.Unlock() if _, ok := c.subscriptions[id]; !ok { return errors.New("no subscription with this ID") } - if err := c.performRequest("unsubscribe", []interface{}{id}, &resp); err != nil { - return err - } - if !resp { - return errors.New("unsubscribe method returned false result") - } - delete(c.subscriptions, id) - return nil + return c.removeSubscription(id) } // SubscribeForNewBlocks adds subscription for new block events to this instance // of the client. It can be filtered by primary consensus node index and/or block -// index allowing to receive blocks since the specified index only, nil value is -// treated as missing filter. +// index allowing to receive blocks since/till the specified index only, nil value +// is treated as missing filter. // -// Deprecated: please, use SubscribeForNewBlocksWithChan. This method will be removed in future versions. +// Deprecated: please, use ReceiveBlocks. This method will be removed in future versions. func (c *WSClient) SubscribeForNewBlocks(primary *int, sinceIndex, tillIndex *uint32) (string, error) { - return c.SubscribeForNewBlocksWithChan(primary, sinceIndex, tillIndex, c.Notifications) -} - -// SubscribeForNewBlocksWithChan registers provided channel as a receiver for the -// new block events. Events can be filtered by primary consensus node index, nil -// value doesn't add any filters. If the receiver channel is nil, then the default -// Notifications channel will be used. The receiver channel must be properly read -// and drained after usage in order not to block other notification receivers. -func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, sinceIndex, tillIndex *uint32, rcvrCh chan<- Notification) (string, error) { - if rcvrCh == nil { - rcvrCh = c.Notifications - } - params := []interface{}{"block_added"} var flt *neorpc.BlockFilter if primary != nil || sinceIndex != nil || tillIndex != nil { flt = &neorpc.BlockFilter{Primary: primary, Since: sinceIndex, Till: tillIndex} - params = append(params, flt) } - rcvr := notificationReceiver{ - typ: neorpc.BlockEventID, + params := []interface{}{"block_added"} + if flt != nil { + params = append(params, *flt) + } + r := &naiveReceiver{ + eventID: neorpc.BlockEventID, + filter: flt, + ch: c.Notifications, + } + return c.performSubscription(params, r) +} + +// ReceiveBlocks registers provided channel as a receiver for the new block events. +// Events can be filtered by the given BlockFilter, nil value doesn't add any filter. +// The receiver channel must be properly read and drained after usage in order not +// to block other notification receivers. If multiple subscriptions share the same +// receiver channel, then matching notification is only sent once per channel. The +// receiver channel will be closed by the WSClient immediately after MissedEvent is +// received from the server; no unsubscription is performed in this case, so it's the +// user responsibility to unsubscribe. +func (c *WSClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) { + if rcvr == nil { + return "", ErrNilNotificationReceiver + } + params := []interface{}{"block_added"} + if flt != nil { + params = append(params, *flt) + } + r := &blockReceiver{ filter: flt, - ch: rcvrCh, + ch: rcvr, } - return c.performSubscription(params, rcvr) + return c.performSubscription(params, r) } // SubscribeForNewTransactions adds subscription for new transaction events to // this instance of the client. It can be filtered by the sender and/or the signer, nil // value is treated as missing filter. // -// Deprecated: please, use SubscribeForNewTransactionsWithChan. This method will be removed in future versions. +// Deprecated: please, use ReceiveTransactions. This method will be removed in future versions. func (c *WSClient) SubscribeForNewTransactions(sender *util.Uint160, signer *util.Uint160) (string, error) { - return c.SubscribeForNewTransactionsWithChan(sender, signer, c.Notifications) -} - -// SubscribeForNewTransactionsWithChan registers provided channel as a receiver -// for new transaction events. Events can be filtered by the sender and/or the -// signer, nil value is treated as missing filter. If the receiver channel is nil, -// then the default Notifications channel will be used. The receiver channel must be -// properly read and drained after usage in order not to block other notification -// receivers. -func (c *WSClient) SubscribeForNewTransactionsWithChan(sender *util.Uint160, signer *util.Uint160, rcvrCh chan<- Notification) (string, error) { - if rcvrCh == nil { - rcvrCh = c.Notifications - } - params := []interface{}{"transaction_added"} var flt *neorpc.TxFilter if sender != nil || signer != nil { flt = &neorpc.TxFilter{Sender: sender, Signer: signer} + } + params := []interface{}{"transaction_added"} + if flt != nil { params = append(params, *flt) } - rcvr := notificationReceiver{ - typ: neorpc.TransactionEventID, - filter: flt, - ch: rcvrCh, + r := &naiveReceiver{ + eventID: neorpc.TransactionEventID, + filter: flt, + ch: c.Notifications, } - return c.performSubscription(params, rcvr) + return c.performSubscription(params, r) +} + +// ReceiveTransactions registers provided channel as a receiver for new transaction +// events. Events can be filtered by the given TxFilter, nil value doesn't add any +// filter. The receiver channel must be properly read and drained after usage in +// order not to block other notification receivers. If multiple subscriptions share +// the same receiver channel, then matching notification is only sent once per channel. +// The receiver channel will be closed by the WSClient immediately after MissedEvent is +// received from the server; no unsubscription is performed in this case, so it's the +// user responsibility to unsubscribe. +func (c *WSClient) ReceiveTransactions(flt *neorpc.TxFilter, rcvr chan<- *transaction.Transaction) (string, error) { + if rcvr == nil { + return "", ErrNilNotificationReceiver + } + params := []interface{}{"transaction_added"} + if flt != nil { + params = append(params, *flt) + } + r := &txReceiver{ + filter: flt, + ch: rcvr, + } + return c.performSubscription(params, r) } // SubscribeForExecutionNotifications adds subscription for notifications @@ -463,33 +731,45 @@ func (c *WSClient) SubscribeForNewTransactionsWithChan(sender *util.Uint160, sig // filtered by the contract's hash (that emits notifications), nil value puts no such // restrictions. // -// Deprecated: please, use SubscribeForExecutionNotificationsWithChan. This method will be removed in future versions. +// Deprecated: please, use ReceiveExecutionNotifications. This method will be removed in future versions. func (c *WSClient) SubscribeForExecutionNotifications(contract *util.Uint160, name *string) (string, error) { - return c.SubscribeForExecutionNotificationsWithChan(contract, name, c.Notifications) -} - -// SubscribeForExecutionNotificationsWithChan registers provided channel as a -// receiver for execution events. Events can be filtered by the contract's hash -// (that emits notifications), nil value puts no such restrictions. If the -// receiver channel is nil, then the default Notifications channel will be used. -// The receiver channel must be properly read and drained after usage in order -// not to block other notification receivers. -func (c *WSClient) SubscribeForExecutionNotificationsWithChan(contract *util.Uint160, name *string, rcvrCh chan<- Notification) (string, error) { - if rcvrCh == nil { - rcvrCh = c.Notifications - } - params := []interface{}{"notification_from_execution"} var flt *neorpc.NotificationFilter if contract != nil || name != nil { flt = &neorpc.NotificationFilter{Contract: contract, Name: name} + } + params := []interface{}{"notification_from_execution"} + if flt != nil { params = append(params, *flt) } - rcvr := notificationReceiver{ - typ: neorpc.NotificationEventID, - filter: flt, - ch: rcvrCh, + r := &naiveReceiver{ + eventID: neorpc.NotificationEventID, + filter: flt, + ch: c.Notifications, } - return c.performSubscription(params, rcvr) + return c.performSubscription(params, r) +} + +// ReceiveExecutionNotifications registers provided channel as a receiver for execution +// events. Events can be filtered by the given NotificationFilter, nil value doesn't add +// any filter. The receiver channel must be properly read and drained after usage in +// order not to block other notification receivers. If multiple subscriptions share the +// same receiver channel, then matching notification is only sent once per channel. The +// receiver channel will be closed by the WSClient immediately after MissedEvent is +// received from the server; no unsubscription is performed in this case, so it's the +// user responsibility to unsubscribe. +func (c *WSClient) ReceiveExecutionNotifications(flt *neorpc.NotificationFilter, rcvr chan<- *state.ContainedNotificationEvent) (string, error) { + if rcvr == nil { + return "", ErrNilNotificationReceiver + } + params := []interface{}{"notification_from_execution"} + if flt != nil { + params = append(params, *flt) + } + r := &executionNotificationReceiver{ + filter: flt, + ch: rcvr, + } + return c.performSubscription(params, r) } // SubscribeForTransactionExecutions adds subscription for application execution @@ -498,39 +778,56 @@ func (c *WSClient) SubscribeForExecutionNotificationsWithChan(contract *util.Uin // transactions; it can also be filtered by script container hash. // nil value means no filtering. // -// Deprecated: please, use SubscribeForTransactionExecutionsWithChan. This method will be removed in future versions. -func (c *WSClient) SubscribeForTransactionExecutions(state *string, container *util.Uint256) (string, error) { - return c.SubscribeForTransactionExecutionsWithChan(state, container, c.Notifications) -} - -// SubscribeForTransactionExecutionsWithChan registers provided channel as a -// receiver for application execution result events generated during transaction -// execution. Events can be filtered by state (HALT/FAULT) to check for successful -// or failing transactions; it can also be filtered by script container hash. -// nil value means no filtering. If the receiver channel is nil, then the default -// Notifications channel will be used. The receiver channel must be properly read -// and drained after usage in order not to block other notification receivers. -func (c *WSClient) SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- Notification) (string, error) { - if rcvrCh == nil { - rcvrCh = c.Notifications +// Deprecated: please, use ReceiveExecutions. This method will be removed in future versions. +func (c *WSClient) SubscribeForTransactionExecutions(vmState *string, container *util.Uint256) (string, error) { + var flt *neorpc.ExecutionFilter + if vmState != nil || container != nil { + flt = &neorpc.ExecutionFilter{State: vmState, Container: container} } params := []interface{}{"transaction_executed"} - var flt *neorpc.ExecutionFilter - if state != nil || container != nil { - if state != nil { - if *state != "HALT" && *state != "FAULT" { + if flt != nil { + if flt.State != nil { + if *flt.State != "HALT" && *flt.State != "FAULT" { return "", errors.New("bad state parameter") } } - flt = &neorpc.ExecutionFilter{State: state, Container: container} params = append(params, *flt) } - rcvr := notificationReceiver{ - typ: neorpc.ExecutionEventID, - filter: flt, - ch: rcvrCh, + r := &naiveReceiver{ + eventID: neorpc.ExecutionEventID, + filter: flt, + ch: c.Notifications, } - return c.performSubscription(params, rcvr) + return c.performSubscription(params, r) +} + +// ReceiveExecutions registers provided channel as a receiver for +// application execution result events generated during transaction execution. +// Events can be filtered by the given ExecutionFilter, nil value doesn't add any filter. +// The receiver channel must be properly read and drained after usage in order not +// to block other notification receivers. If multiple subscriptions share the same +// receiver channel, then matching notification is only sent once per channel. The +// receiver channel will be closed by the WSClient immediately after MissedEvent is +// received from the server; no unsubscription is performed in this case, so it's the +// user responsibility to unsubscribe. +func (c *WSClient) ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error) { + if rcvr == nil { + return "", ErrNilNotificationReceiver + } + params := []interface{}{"transaction_executed"} + if flt != nil { + if flt.State != nil { + if *flt.State != "HALT" && *flt.State != "FAULT" { + return "", errors.New("bad state parameter") + } + } + params = append(params, *flt) + } + r := &executionReceiver{ + filter: flt, + ch: rcvr, + } + return c.performSubscription(params, r) } // SubscribeForNotaryRequests adds subscription for notary request payloads @@ -538,33 +835,47 @@ func (c *WSClient) SubscribeForTransactionExecutionsWithChan(state *string, cont // request sender's hash, or main tx signer's hash, nil value puts no such // restrictions. // -// Deprecated: please, use SubscribeForNotaryRequestsWithChan. This method will be removed in future versions. +// Deprecated: please, use ReceiveNotaryRequests. This method will be removed in future versions. func (c *WSClient) SubscribeForNotaryRequests(sender *util.Uint160, mainSigner *util.Uint160) (string, error) { - return c.SubscribeForNotaryRequestsWithChan(sender, mainSigner, c.Notifications) -} - -// SubscribeForNotaryRequestsWithChan registers provided channel as a receiver -// for notary request payload addition or removal events. It can be filtered by -// request sender's hash, or main tx signer's hash, nil value puts no such -// restrictions. If the receiver channel is nil, then the default Notifications -// channel will be used. The receiver channel must be properly read and drained -// after usage in order not to block other notification receivers. -func (c *WSClient) SubscribeForNotaryRequestsWithChan(sender *util.Uint160, mainSigner *util.Uint160, rcvrCh chan<- Notification) (string, error) { - if rcvrCh == nil { - rcvrCh = c.Notifications + var flt *neorpc.TxFilter + if sender != nil || mainSigner != nil { + flt = &neorpc.TxFilter{Sender: sender, Signer: mainSigner} } params := []interface{}{"notary_request_event"} - var flt *neorpc.TxFilter - if sender != nil { - flt = &neorpc.TxFilter{Sender: sender, Signer: mainSigner} + if flt != nil { params = append(params, *flt) } - rcvr := notificationReceiver{ - typ: neorpc.NotaryRequestEventID, - filter: flt, - ch: rcvrCh, + r := &naiveReceiver{ + eventID: neorpc.NotaryRequestEventID, + filter: flt, + ch: c.Notifications, } - return c.performSubscription(params, rcvr) + return c.performSubscription(params, r) +} + +// ReceiveNotaryRequests registers provided channel as a receiver for notary request +// payload addition or removal events. Events can be filtered by the given TxFilter +// where sender corresponds to notary request sender (the second fallback transaction +// signer) and signer corresponds to main transaction signers. nil value doesn't add +// any filter. The receiver channel must be properly read and drained after usage in +// order not to block other notification receivers. If multiple subscriptions share +// the same receiver channel, then matching notification is only sent once per channel. +// The receiver channel will be closed by the WSClient immediately after MissedEvent +// is received from the server; no unsubscription is performed in this case, so it's the +// user responsibility to unsubscribe. +func (c *WSClient) ReceiveNotaryRequests(flt *neorpc.TxFilter, rcvr chan<- *result.NotaryRequestEvent) (string, error) { + if rcvr == nil { + return "", ErrNilNotificationReceiver + } + params := []interface{}{"notary_request_event"} + if flt != nil { + params = append(params, *flt) + } + r := ¬aryRequestReceiver{ + filter: flt, + ch: rcvr, + } + return c.performSubscription(params, r) } // Unsubscribe removes subscription for the given event stream. @@ -578,18 +889,43 @@ func (c *WSClient) UnsubscribeAll() error { defer c.subscriptionsLock.Unlock() for id := range c.subscriptions { - var resp bool - if err := c.performRequest("unsubscribe", []interface{}{id}, &resp); err != nil { + err := c.removeSubscription(id) + if err != nil { return err } - if !resp { - return errors.New("unsubscribe method returned false result") - } - delete(c.subscriptions, id) } return nil } +// removeSubscription is internal method that removes subscription with the given +// ID from the list of subscriptions and receivers. It must be performed under +// subscriptions lock. +func (c *WSClient) removeSubscription(id string) error { + var resp bool + if err := c.performRequest("unsubscribe", []interface{}{id}, &resp); err != nil { + return err + } + if !resp { + return errors.New("unsubscribe method returned false result") + } + rcvr := c.subscriptions[id] + ch := rcvr.Receiver() + ids := c.receivers[ch] + for i, rcvrID := range ids { + if rcvrID == id { + ids = append(ids[:i], ids[i+1:]...) + break + } + } + if len(ids) == 0 { + delete(c.receivers, ch) + } else { + c.receivers[ch] = ids + } + delete(c.subscriptions, id) + return nil +} + // setCloseErr is a thread-safe method setting closeErr in case if it's not yet set. func (c *WSClient) setCloseErr(err error) { c.closeErrLock.Lock() diff --git a/pkg/rpcclient/wsclient_test.go b/pkg/rpcclient/wsclient_test.go index e0ea0fa8f..1d7f3f774 100644 --- a/pkg/rpcclient/wsclient_test.go +++ b/pkg/rpcclient/wsclient_test.go @@ -15,11 +15,16 @@ import ( "github.com/gorilla/websocket" "github.com/nspcc-dev/neo-go/pkg/config/netmode" + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/neorpc" + "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/services/rpcsrv/params" "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neo-go/pkg/vm/vmstate" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" ) @@ -32,31 +37,26 @@ func TestWSClientClose(t *testing.T) { } func TestWSClientSubscription(t *testing.T) { - ch := make(chan Notification) + bCh := make(chan *block.Block) + txCh := make(chan *transaction.Transaction) + aerCh := make(chan *state.AppExecResult) + ntfCh := make(chan *state.ContainedNotificationEvent) + ntrCh := make(chan *result.NotaryRequestEvent) var cases = map[string]func(*WSClient) (string, error){ "blocks": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForNewBlocksWithChan(nil, nil, nil, nil) - }, - "blocks_with_custom_ch": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForNewBlocksWithChan(nil, nil, nil, ch) + return wsc.ReceiveBlocks(nil, bCh) }, "transactions": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForNewTransactionsWithChan(nil, nil, nil) - }, - "transactions_with_custom_ch": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForNewTransactionsWithChan(nil, nil, ch) + return wsc.ReceiveTransactions(nil, txCh) }, "notifications": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForExecutionNotificationsWithChan(nil, nil, nil) - }, - "notifications_with_custom_ch": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForExecutionNotificationsWithChan(nil, nil, ch) + return wsc.ReceiveExecutionNotifications(nil, ntfCh) }, "executions": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForTransactionExecutionsWithChan(nil, nil, nil) + return wsc.ReceiveExecutions(nil, aerCh) }, - "executions_with_custom_ch": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForTransactionExecutionsWithChan(nil, nil, ch) + "notary requests": func(wsc *WSClient) (string, error) { + return wsc.ReceiveNotaryRequests(nil, ntrCh) }, } t.Run("good", func(t *testing.T) { @@ -96,13 +96,13 @@ func TestWSClientUnsubscription(t *testing.T) { var cases = map[string]responseCheck{ "good": {`{"jsonrpc": "2.0", "id": 1, "result": true}`, func(t *testing.T, wsc *WSClient) { // We can't really subscribe using this stub server, so set up wsc internals. - wsc.subscriptions["0"] = notificationReceiver{} + wsc.subscriptions["0"] = &blockReceiver{} err := wsc.Unsubscribe("0") require.NoError(t, err) }}, "all": {`{"jsonrpc": "2.0", "id": 1, "result": true}`, func(t *testing.T, wsc *WSClient) { // We can't really subscribe using this stub server, so set up wsc internals. - wsc.subscriptions["0"] = notificationReceiver{} + wsc.subscriptions["0"] = &blockReceiver{} err := wsc.UnsubscribeAll() require.NoError(t, err) require.Equal(t, 0, len(wsc.subscriptions)) @@ -113,13 +113,13 @@ func TestWSClientUnsubscription(t *testing.T) { }}, "error returned": {`{"jsonrpc": "2.0", "id": 1, "error":{"code":-32602,"message":"Invalid Params"}}`, func(t *testing.T, wsc *WSClient) { // We can't really subscribe using this stub server, so set up wsc internals. - wsc.subscriptions["0"] = notificationReceiver{} + wsc.subscriptions["0"] = &blockReceiver{} err := wsc.Unsubscribe("0") require.Error(t, err) }}, "false returned": {`{"jsonrpc": "2.0", "id": 1, "result": false}`, func(t *testing.T, wsc *WSClient) { // We can't really subscribe using this stub server, so set up wsc internals. - wsc.subscriptions["0"] = notificationReceiver{} + wsc.subscriptions["0"] = &blockReceiver{} err := wsc.Unsubscribe("0") require.Error(t, err) }}, @@ -144,7 +144,7 @@ func TestWSClientEvents(t *testing.T) { `{"jsonrpc":"2.0","method":"notification_from_execution","params":[{"container":"0xe1cd5e57e721d2a2e05fb1f08721b12057b25ab1dd7fd0f33ee1639932fdfad7","contract":"0x1b4357bff5a01bdf2a6581247cf9ed1e24629176","eventname":"contract call","state":{"type":"Array","value":[{"type":"ByteString","value":"dHJhbnNmZXI="},{"type":"Array","value":[{"type":"ByteString","value":"dpFiJB7t+XwkgWUq3xug9b9XQxs="},{"type":"ByteString","value":"MW6FEDkBnTnfwsN9bD/uGf1YCYc="},{"type":"Integer","value":"1000"}]}]}}]}`, `{"jsonrpc":"2.0","method":"transaction_executed","params":[{"container":"0xf97a72b7722c109f909a8bc16c22368c5023d85828b09b127b237aace33cf099","trigger":"Application","vmstate":"HALT","gasconsumed":"6042610","stack":[],"notifications":[{"contract":"0xe65ff7b3a02d207b584a5c27057d4e9862ef01da","eventname":"contract call","state":{"type":"Array","value":[{"type":"ByteString","value":"dHJhbnNmZXI="},{"type":"Array","value":[{"type":"ByteString","value":"MW6FEDkBnTnfwsN9bD/uGf1YCYc="},{"type":"ByteString","value":"IHKCdK+vw29DoHHTKM+j5inZy7A="},{"type":"Integer","value":"123"}]}]}},{"contract":"0xe65ff7b3a02d207b584a5c27057d4e9862ef01da","eventname":"transfer","state":{"type":"Array","value":[{"type":"ByteString","value":"MW6FEDkBnTnfwsN9bD/uGf1YCYc="},{"type":"ByteString","value":"IHKCdK+vw29DoHHTKM+j5inZy7A="},{"type":"Integer","value":"123"}]}}]}]}`, fmt.Sprintf(`{"jsonrpc":"2.0","method":"block_added","params":[%s]}`, b1Verbose), - `{"jsonrpc":"2.0","method":"event_missed","params":[]}`, + `{"jsonrpc":"2.0","method":"event_missed","params":[]}`, // the last one, will trigger receiver channels closing. } srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { if req.URL.Path == "/ws" && req.Method == "GET" { @@ -163,109 +163,123 @@ func TestWSClientEvents(t *testing.T) { return } })) + wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{}) + require.NoError(t, err) + wsc.getNextRequestID = getTestRequestID + wsc.cacheLock.Lock() + wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually. + wsc.cache.network = netmode.UnitTestNet + wsc.cacheLock.Unlock() - t.Run("default ntf channel", func(t *testing.T) { - wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{}) - require.NoError(t, err) - wsc.getNextRequestID = getTestRequestID - wsc.cacheLock.Lock() - wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually. - wsc.cache.network = netmode.UnitTestNet - wsc.cacheLock.Unlock() - // Our server mock is restricted, so perform subscriptions manually with default notifications channel. - wsc.subscriptionsLock.Lock() - wsc.subscriptions["0"] = notificationReceiver{typ: neorpc.BlockEventID, ch: wsc.Notifications} - wsc.subscriptions["1"] = notificationReceiver{typ: neorpc.ExecutionEventID, ch: wsc.Notifications} - wsc.subscriptions["2"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: wsc.Notifications} - // MissedEvent must be delivered without subscription. - wsc.subscriptionsLock.Unlock() - for range events { - select { - case _, ok = <-wsc.Notifications: - case <-time.After(time.Second): - t.Fatal("timeout waiting for event") - } - require.True(t, ok) - } + // Our server mock is restricted, so perform subscriptions manually with default notifications channel. + bCh1 := make(chan *block.Block) + bCh2 := make(chan *block.Block) + aerCh1 := make(chan *state.AppExecResult) + aerCh2 := make(chan *state.AppExecResult) + aerCh3 := make(chan *state.AppExecResult) + ntfCh := make(chan *state.ContainedNotificationEvent) + halt := "HALT" + fault := "FAULT" + wsc.subscriptionsLock.Lock() + wsc.subscriptions["0"] = &blockReceiver{ch: bCh1} + wsc.receivers[chan<- *block.Block(bCh1)] = []string{"0"} + wsc.subscriptions["1"] = &blockReceiver{ch: bCh2} // two different channels subscribed for same notifications + wsc.receivers[chan<- *block.Block(bCh2)] = []string{"1"} + + wsc.subscriptions["2"] = &executionNotificationReceiver{ch: ntfCh} + wsc.subscriptions["3"] = &executionNotificationReceiver{ch: ntfCh} // check duplicating subscriptions + wsc.receivers[chan<- *state.ContainedNotificationEvent(ntfCh)] = []string{"2", "3"} + + wsc.subscriptions["4"] = &executionReceiver{ch: aerCh1} + wsc.receivers[chan<- *state.AppExecResult(aerCh1)] = []string{"4"} + wsc.subscriptions["5"] = &executionReceiver{filter: &neorpc.ExecutionFilter{State: &halt}, ch: aerCh2} + wsc.receivers[chan<- *state.AppExecResult(aerCh2)] = []string{"5"} + wsc.subscriptions["6"] = &executionReceiver{filter: &neorpc.ExecutionFilter{State: &fault}, ch: aerCh3} + wsc.receivers[chan<- *state.AppExecResult(aerCh3)] = []string{"6"} + // MissedEvent must close the channels above. + + wsc.subscriptions["7"] = &naiveReceiver{eventID: neorpc.BlockEventID, ch: wsc.Notifications} + wsc.subscriptions["8"] = &naiveReceiver{eventID: neorpc.BlockEventID, ch: wsc.Notifications} // check duplicating subscriptions + wsc.subscriptions["9"] = &naiveReceiver{eventID: neorpc.ExecutionEventID, ch: wsc.Notifications} // check different events + wsc.receivers[wsc.Notifications] = []string{"7", "8", "9"} + wsc.subscriptionsLock.Unlock() + + var ( + b1Cnt, b2Cnt int + aer1Cnt, aer2Cnt, aer3Cnt int + ntfCnt int + defaultCount int + expectedb1Cnt, expectedb2Cnt = 1, 1 // single Block event + expectedaer1Cnt, expectedaer2Cnt, expectedaer3Cnt = 2, 2, 0 // two HALTED AERs + expectedntfCnt = 1 // single notification event + expectedDefaultCnt = 1 + 2 + 1 // single Block event + two AERs + missed event + aer *state.AppExecResult + ) + for b1Cnt+b2Cnt+ + aer1Cnt+aer2Cnt+aer3Cnt+ + ntfCnt+ + defaultCount != + expectedb1Cnt+expectedb2Cnt+ + expectedaer1Cnt+expectedaer2Cnt+expectedaer3Cnt+ + expectedntfCnt+ + expectedDefaultCnt { select { + case _, ok = <-bCh1: + if ok { + b1Cnt++ + } + case _, ok = <-bCh2: + if ok { + b2Cnt++ + } + case _, ok = <-aerCh1: + if ok { + aer1Cnt++ + } + case aer, ok = <-aerCh2: + if ok { + require.Equal(t, vmstate.Halt, aer.VMState) + aer2Cnt++ + } + case _, ok = <-aerCh3: + if ok { + aer3Cnt++ + } + case _, ok = <-ntfCh: + if ok { + ntfCnt++ + } case _, ok = <-wsc.Notifications: + if ok { + defaultCount++ + } case <-time.After(time.Second): t.Fatal("timeout waiting for event") } - // Connection closed by server. - require.False(t, ok) - }) - t.Run("multiple ntf channels", func(t *testing.T) { - wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{}) - require.NoError(t, err) - wsc.getNextRequestID = getTestRequestID - wsc.cacheLock.Lock() - wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually. - wsc.cache.network = netmode.UnitTestNet - wsc.cacheLock.Unlock() + } + assert.Equal(t, expectedb1Cnt, b1Cnt) + assert.Equal(t, expectedb2Cnt, b2Cnt) + assert.Equal(t, expectedaer1Cnt, aer1Cnt) + assert.Equal(t, expectedaer2Cnt, aer2Cnt) + assert.Equal(t, expectedaer3Cnt, aer3Cnt) + assert.Equal(t, expectedntfCnt, ntfCnt) + assert.Equal(t, expectedDefaultCnt, defaultCount) - // Our server mock is restricted, so perform subscriptions manually with default notifications channel. - ch1 := make(chan Notification) - ch2 := make(chan Notification) - ch3 := make(chan Notification) - halt := "HALT" - fault := "FAULT" - wsc.subscriptionsLock.Lock() - wsc.subscriptions["0"] = notificationReceiver{typ: neorpc.BlockEventID, ch: wsc.Notifications} - wsc.subscriptions["1"] = notificationReceiver{typ: neorpc.ExecutionEventID, ch: wsc.Notifications} - wsc.subscriptions["2"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: wsc.Notifications} - wsc.subscriptions["3"] = notificationReceiver{typ: neorpc.BlockEventID, ch: ch1} - wsc.subscriptions["4"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2} - wsc.subscriptions["5"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2} // check duplicating subscriptions - wsc.subscriptions["6"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: &halt}, ch: ch2} - wsc.subscriptions["7"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: &fault}, ch: ch3} - // MissedEvent must be delivered without subscription. - wsc.subscriptionsLock.Unlock() - - var ( - defaultChCnt int - ch1Cnt int - ch2Cnt int - ch3Cnt int - expectedDefaultCnCount = len(events) - expectedCh1Cnt = 1 + 1 // Block event + Missed event - expectedCh2Cnt = 1 + 2 + 1 // Notification event + 2 Execution events + Missed event - expectedCh3Cnt = 1 // Missed event - ntf Notification - ) - for i := 0; i < expectedDefaultCnCount+expectedCh1Cnt+expectedCh2Cnt+expectedCh3Cnt; i++ { - select { - case ntf, ok = <-wsc.Notifications: - defaultChCnt++ - case ntf, ok = <-ch1: - require.True(t, ntf.Type == neorpc.BlockEventID || ntf.Type == neorpc.MissedEventID, ntf.Type) - ch1Cnt++ - case ntf, ok = <-ch2: - require.True(t, ntf.Type == neorpc.NotificationEventID || ntf.Type == neorpc.MissedEventID || ntf.Type == neorpc.ExecutionEventID) - ch2Cnt++ - case ntf, ok = <-ch3: - require.True(t, ntf.Type == neorpc.MissedEventID) - ch3Cnt++ - case <-time.After(time.Second): - t.Fatal("timeout waiting for event") - } - require.True(t, ok) - } - select { - case _, ok = <-wsc.Notifications: - case _, ok = <-ch1: - case _, ok = <-ch2: - case _, ok = <-ch3: - case <-time.After(time.Second): - t.Fatal("timeout waiting for event") - } - // Connection closed by server. - require.False(t, ok) - require.Equal(t, expectedDefaultCnCount, defaultChCnt) - require.Equal(t, expectedCh1Cnt, ch1Cnt) - require.Equal(t, expectedCh2Cnt, ch2Cnt) - require.Equal(t, expectedCh3Cnt, ch3Cnt) - }) + // Channels must be closed by server + _, ok = <-bCh1 + require.False(t, ok) + _, ok = <-bCh2 + require.False(t, ok) + _, ok = <-aerCh1 + require.False(t, ok) + _, ok = <-aerCh2 + require.False(t, ok) + _, ok = <-aerCh3 + require.False(t, ok) + _, ok = <-ntfCh + require.False(t, ok) + _, ok = <-ntfCh + require.False(t, ok) } func TestWSExecutionVMStateCheck(t *testing.T) { @@ -276,12 +290,16 @@ func TestWSExecutionVMStateCheck(t *testing.T) { wsc.getNextRequestID = getTestRequestID require.NoError(t, wsc.Init()) filter := "NONE" - _, err = wsc.SubscribeForTransactionExecutionsWithChan(&filter, nil, nil) + _, err = wsc.ReceiveExecutions(&neorpc.ExecutionFilter{State: &filter}, make(chan *state.AppExecResult)) require.Error(t, err) wsc.Close() } func TestWSFilteredSubscriptions(t *testing.T) { + bCh := make(chan *block.Block) + txCh := make(chan *transaction.Transaction) + aerCh := make(chan *state.AppExecResult) + ntfCh := make(chan *state.ContainedNotificationEvent) var cases = []struct { name string clientCode func(*testing.T, *WSClient) @@ -290,7 +308,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"blocks primary", func(t *testing.T, wsc *WSClient) { primary := 3 - _, err := wsc.SubscribeForNewBlocksWithChan(&primary, nil, nil, nil) + _, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{Primary: &primary}, bCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -305,7 +323,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"blocks since", func(t *testing.T, wsc *WSClient) { var since uint32 = 3 - _, err := wsc.SubscribeForNewBlocksWithChan(nil, &since, nil, nil) + _, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -320,7 +338,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"blocks till", func(t *testing.T, wsc *WSClient) { var till uint32 = 3 - _, err := wsc.SubscribeForNewBlocksWithChan(nil, nil, &till, nil) + _, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{Till: &till}, bCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -339,7 +357,11 @@ func TestWSFilteredSubscriptions(t *testing.T) { primary = 2 till uint32 = 5 ) - _, err := wsc.SubscribeForNewBlocksWithChan(&primary, &since, &till, nil) + _, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{ + Primary: &primary, + Since: &since, + Till: &till, + }, bCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -354,7 +376,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"transactions sender", func(t *testing.T, wsc *WSClient) { sender := util.Uint160{1, 2, 3, 4, 5} - _, err := wsc.SubscribeForNewTransactionsWithChan(&sender, nil, nil) + _, err := wsc.ReceiveTransactions(&neorpc.TxFilter{Sender: &sender}, txCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -368,7 +390,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"transactions signer", func(t *testing.T, wsc *WSClient) { signer := util.Uint160{0, 42} - _, err := wsc.SubscribeForNewTransactionsWithChan(nil, &signer, nil) + _, err := wsc.ReceiveTransactions(&neorpc.TxFilter{Signer: &signer}, txCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -383,7 +405,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { func(t *testing.T, wsc *WSClient) { sender := util.Uint160{1, 2, 3, 4, 5} signer := util.Uint160{0, 42} - _, err := wsc.SubscribeForNewTransactionsWithChan(&sender, &signer, nil) + _, err := wsc.ReceiveTransactions(&neorpc.TxFilter{Sender: &sender, Signer: &signer}, txCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -397,7 +419,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"notifications contract hash", func(t *testing.T, wsc *WSClient) { contract := util.Uint160{1, 2, 3, 4, 5} - _, err := wsc.SubscribeForExecutionNotificationsWithChan(&contract, nil, nil) + _, err := wsc.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, ntfCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -411,7 +433,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"notifications name", func(t *testing.T, wsc *WSClient) { name := "my_pretty_notification" - _, err := wsc.SubscribeForExecutionNotificationsWithChan(nil, &name, nil) + _, err := wsc.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Name: &name}, ntfCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -426,7 +448,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { func(t *testing.T, wsc *WSClient) { contract := util.Uint160{1, 2, 3, 4, 5} name := "my_pretty_notification" - _, err := wsc.SubscribeForExecutionNotificationsWithChan(&contract, &name, nil) + _, err := wsc.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract, Name: &name}, ntfCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -440,7 +462,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"executions state", func(t *testing.T, wsc *WSClient) { state := "FAULT" - _, err := wsc.SubscribeForTransactionExecutionsWithChan(&state, nil, nil) + _, err := wsc.ReceiveExecutions(&neorpc.ExecutionFilter{State: &state}, aerCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -454,7 +476,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"executions container", func(t *testing.T, wsc *WSClient) { container := util.Uint256{1, 2, 3} - _, err := wsc.SubscribeForTransactionExecutionsWithChan(nil, &container, nil) + _, err := wsc.ReceiveExecutions(&neorpc.ExecutionFilter{Container: &container}, aerCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -469,7 +491,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { func(t *testing.T, wsc *WSClient) { state := "FAULT" container := util.Uint256{1, 2, 3} - _, err := wsc.SubscribeForTransactionExecutionsWithChan(&state, &container, nil) + _, err := wsc.ReceiveExecutions(&neorpc.ExecutionFilter{State: &state, Container: &container}, aerCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) {