From 0d39602a503cbcdc415c695ff4d65c99abdae7e9 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Wed, 26 Oct 2022 08:36:22 +0300 Subject: [PATCH 1/6] docs: adjust docmentation for execution results notifications --- docs/notifications.md | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/docs/notifications.md b/docs/notifications.md index 8a2b7ffdb..740dfb392 100644 --- a/docs/notifications.md +++ b/docs/notifications.md @@ -17,7 +17,7 @@ Currently supported events: * notification generated during execution Contents: container hash, contract hash, notification name, stack item. Filters: contract hash, notification name. - * transaction executed + * transaction/persisting script executed Contents: application execution result. Filters: VM state, script container hash. * new/removed P2P notary request (if `P2PSigExtensions` are enabled) @@ -34,9 +34,13 @@ Filters use conjunctional logic. announcing the block itself * transaction notifications are only announced for successful transactions * all announcements are being done in the same order they happen on the chain - First, transaction execution is announced. It is then followed by notifications - generated during this execution. Next, follows the transaction announcement. - Transaction announcements are ordered the same way they're in the block. + First, OnPersist script execution is announced followed by notifications generated + during the script execution. After that transaction execution is announced. It is + then followed by notifications generated during this execution. Next, follows the + transaction announcement. Transaction announcements are ordered the same way + they're in the block. After all in-block transactions announcements PostPersist + script execution is announced followed by notifications generated during the + script execution. Finally, block announcement is followed. * unsubscription may not cancel pending, but not yet sent events ## Subscription management @@ -72,7 +76,7 @@ Recognized stream names: * `transaction_executed` Filter: `state` field containing `HALT` or `FAULT` string for successful and failed executions respectively and/or `container` field containing - script container hash. + script container (block/transaction) hash. * `notary_request_event` Filter: `sender` field containing a string with hex-encoded Uint160 (LE representation) for notary request's `Sender` and/or `signer` in the same From 2a53db42af8be547e913e32c3e3a54461ed10108 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Wed, 26 Oct 2022 10:24:08 +0300 Subject: [PATCH 2/6] neorpc: adjust and extend event filters documentation --- pkg/neorpc/types.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/pkg/neorpc/types.go b/pkg/neorpc/types.go index aa20ec50c..20e1d2359 100644 --- a/pkg/neorpc/types.go +++ b/pkg/neorpc/types.go @@ -72,29 +72,34 @@ type ( } // BlockFilter is a wrapper structure for the block event filter. It allows - // to filter blocks by primary index and by block index (allowing blocks since - // the specified index). + // to filter blocks by primary index and/or by block index (allowing blocks + // since/till the specified index inclusively). nil value treated as missing + // filter. BlockFilter struct { Primary *int `json:"primary,omitempty"` Since *uint32 `json:"since,omitempty"` Till *uint32 `json:"till,omitempty"` } // TxFilter is a wrapper structure for the transaction event filter. It - // allows to filter transactions by senders and signers. + // allows to filter transactions by senders and/or signers. nil value treated + // as missing filter. TxFilter struct { Sender *util.Uint160 `json:"sender,omitempty"` Signer *util.Uint160 `json:"signer,omitempty"` } // NotificationFilter is a wrapper structure representing a filter used for // notifications generated during transaction execution. Notifications can - // be filtered by contract hash and by name. + // be filtered by contract hash and/or by name. nil value treated as missing + // filter. NotificationFilter struct { Contract *util.Uint160 `json:"contract,omitempty"` Name *string `json:"name,omitempty"` } - // ExecutionFilter is a wrapper structure used for transaction execution - // events. It allows to choose failing or successful transactions based - // on their VM state. + // ExecutionFilter is a wrapper structure used for transaction and persisting + // scripts execution events. It allows to choose failing or successful + // transactions and persisting scripts based on their VM state and/or to + // choose execution event with the specified container. nil value treated as + // missing filter. ExecutionFilter struct { State *string `json:"state,omitempty"` Container *util.Uint256 `json:"container,omitempty"` From 0a5905390c4bc3cb2dcd7b8d7d9616b2f43223fb Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Tue, 25 Oct 2022 15:11:24 +0300 Subject: [PATCH 3/6] rpc: refactor WSClient subscriptions API Make it more specific, close #2756. --- pkg/rpcclient/actor/waiter.go | 64 +-- pkg/rpcclient/actor/waiter_test.go | 23 +- pkg/rpcclient/wsclient.go | 616 ++++++++++++++++++++++------- pkg/rpcclient/wsclient_test.go | 284 +++++++------ 4 files changed, 672 insertions(+), 315 deletions(-) diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go index 38685bc90..4eae26f81 100644 --- a/pkg/rpcclient/actor/waiter.go +++ b/pkg/rpcclient/actor/waiter.go @@ -6,10 +6,10 @@ import ( "fmt" "time" + "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" - "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/util" ) @@ -30,6 +30,9 @@ var ( // ErrAwaitingNotSupported is returned from Wait method if Waiter instance // doesn't support transaction awaiting. ErrAwaitingNotSupported = errors.New("awaiting not supported") + // ErrMissedEvent is returned when RPCEventWaiter closes receiver channel + // which happens if missed event was received from the RPC server. + ErrMissedEvent = errors.New("some event was missed") ) type ( @@ -65,8 +68,8 @@ type ( RPCEventWaiter interface { RPCPollingWaiter - SubscribeForNewBlocksWithChan(primary *int, since *uint32, till *uint32, rcvrCh chan<- rpcclient.Notification) (string, error) - SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- rpcclient.Notification) (string, error) + ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) + ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error) Unsubscribe(id string) error } ) @@ -223,22 +226,27 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui } } }() - rcvr := make(chan rpcclient.Notification) + bRcvr := make(chan *block.Block) + aerRcvr := make(chan *state.AppExecResult) defer func() { drainLoop: - // Drain rcvr to avoid other notification receivers blocking. + // Drain receivers to avoid other notification receivers blocking. for { select { - case <-rcvr: + case <-bRcvr: + case <-aerRcvr: default: break drainLoop } } - close(rcvr) + if wsWaitErr == nil || !errors.Is(wsWaitErr, ErrMissedEvent) { + close(bRcvr) + close(aerRcvr) + } }() // Execution event follows the block event, thus wait until the block next to the VUB to be sure. since := vub + 1 - blocksID, err := w.ws.SubscribeForNewBlocksWithChan(nil, &since, nil, rcvr) + blocksID, err := w.ws.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bRcvr) if err != nil { wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err) return @@ -256,7 +264,7 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui } }() for _, h := range hashes { - txsID, err := w.ws.SubscribeForTransactionExecutionsWithChan(nil, &h, rcvr) + txsID, err := w.ws.ReceiveExecutions(&neorpc.ExecutionFilter{Container: &h}, aerRcvr) if err != nil { wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err) return @@ -275,27 +283,25 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui }() } - for { - select { - case ntf := <-rcvr: - switch ntf.Type { - case neorpc.BlockEventID: - waitErr = ErrTxNotAccepted - return - case neorpc.ExecutionEventID: - res = ntf.Value.(*state.AppExecResult) - return - case neorpc.MissedEventID: - // We're toast, retry with non-ws client. - wsWaitErr = errors.New("some event was missed") - return - } - case <-w.ws.Context().Done(): - waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err()) - return - case <-ctx.Done(): - waitErr = fmt.Errorf("%w: %v", ErrContextDone, ctx.Err()) + select { + case _, ok := <-bRcvr: + if !ok { + // We're toast, retry with non-ws client. + wsWaitErr = ErrMissedEvent return } + waitErr = ErrTxNotAccepted + case aer, ok := <-aerRcvr: + if !ok { + // We're toast, retry with non-ws client. + wsWaitErr = ErrMissedEvent + return + } + res = aer + case <-w.ws.Context().Done(): + waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err()) + case <-ctx.Done(): + waitErr = fmt.Errorf("%w: %v", ErrContextDone, ctx.Err()) } + return } diff --git a/pkg/rpcclient/actor/waiter_test.go b/pkg/rpcclient/actor/waiter_test.go index 03a516ce4..881be6a7f 100644 --- a/pkg/rpcclient/actor/waiter_test.go +++ b/pkg/rpcclient/actor/waiter_test.go @@ -11,7 +11,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" - "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/stretchr/testify/require" ) @@ -20,20 +19,20 @@ type AwaitableRPCClient struct { RPCClient chLock sync.RWMutex - subBlockCh chan<- rpcclient.Notification - subTxCh chan<- rpcclient.Notification + subBlockCh chan<- *block.Block + subTxCh chan<- *state.AppExecResult } -func (c *AwaitableRPCClient) SubscribeForNewBlocksWithChan(primary *int, since *uint32, till *uint32, rcvrCh chan<- rpcclient.Notification) (string, error) { +func (c *AwaitableRPCClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) { c.chLock.Lock() defer c.chLock.Unlock() - c.subBlockCh = rcvrCh + c.subBlockCh = rcvr return "1", nil } -func (c *AwaitableRPCClient) SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- rpcclient.Notification) (string, error) { +func (c *AwaitableRPCClient) ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error) { c.chLock.Lock() defer c.chLock.Unlock() - c.subTxCh = rcvrCh + c.subTxCh = rcvr return "2", nil } func (c *AwaitableRPCClient) Unsubscribe(id string) error { return nil } @@ -163,10 +162,7 @@ func TestWSWaiter_Wait(t *testing.T) { check(t, func() { c.chLock.RLock() defer c.chLock.RUnlock() - c.subBlockCh <- rpcclient.Notification{ - Type: neorpc.ExecutionEventID, - Value: expected, - } + c.subTxCh <- expected }) // Missing AER after VUB. @@ -178,9 +174,6 @@ func TestWSWaiter_Wait(t *testing.T) { check(t, func() { c.chLock.RLock() defer c.chLock.RUnlock() - c.subBlockCh <- rpcclient.Notification{ - Type: neorpc.BlockEventID, - Value: &block.Block{}, - } + c.subBlockCh <- &block.Block{} }) } diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index a2ade71d5..236bd9fa0 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -34,6 +34,11 @@ type WSClient struct { // WSClient to block even regular requests. This channel is not buffered. // In case of protocol error or upon connection closure, this channel will // be closed, so make sure to handle this. + // + // Deprecated: please, use custom channels with ReceiveBlocks, ReceiveTransactions, + // ReceiveExecutionNotifications, ReceiveExecutions, ReceiveNotaryRequests + // methods to subscribe for notifications. This field will be removed in future + // versions. Notifications chan Notification ws *websocket.Conn @@ -47,29 +52,261 @@ type WSClient struct { subscriptionsLock sync.RWMutex subscriptions map[string]notificationReceiver + // receivers is a mapping from receiver channel to a set of corresponding subscription IDs. + // It must be accessed with subscriptionsLock taken. Its keys must be used to deliver + // notifications, if channel is not in the receivers list and corresponding subscription + // still exists, notification must not be sent. + receivers map[interface{}][]string respLock sync.RWMutex respChannels map[uint64]chan *neorpc.Response } -// notificationReceiver is a server events receiver. It stores desired notifications ID -// and filter and a channel used to receive matching notifications. -type notificationReceiver struct { - typ neorpc.EventID - filter interface{} - ch chan<- Notification +// notificationReceiver is an interface aimed to provide WS subscriber functionality +// for different types of subscriptions. +type notificationReceiver interface { + // Comparator provides notification filtering functionality. + rpcevent.Comparator + // Receiver returns notification receiver channel. + Receiver() interface{} + // TrySend checks whether notification passes receiver filter and sends it + // to the underlying channel if so. + TrySend(ntf Notification) bool + // Close closes underlying receiver channel. + Close() } -// EventID implements neorpc.Comparator interface and returns notification ID. -func (r notificationReceiver) EventID() neorpc.EventID { - return r.typ +// blockReceiver stores information about block events subscriber. +type blockReceiver struct { + filter *neorpc.BlockFilter + ch chan<- *block.Block } -// Filter implements neorpc.Comparator interface and returns notification filter. -func (r notificationReceiver) Filter() interface{} { +// EventID implements neorpc.Comparator interface. +func (r *blockReceiver) EventID() neorpc.EventID { + return neorpc.BlockEventID +} + +// Filter implements neorpc.Comparator interface. +func (r *blockReceiver) Filter() interface{} { + if r.filter == nil { + return nil + } + return *r.filter +} + +// Receiver implements notificationReceiver interface. +func (r *blockReceiver) Receiver() interface{} { + return r.ch +} + +// TrySend implements notificationReceiver interface. +func (r *blockReceiver) TrySend(ntf Notification) bool { + if rpcevent.Matches(r, ntf) { + r.ch <- ntf.Value.(*block.Block) + return true + } + return false +} + +// Close implements notificationReceiver interface. +func (r *blockReceiver) Close() { + close(r.ch) +} + +// txReceiver stores information about transaction events subscriber. +type txReceiver struct { + filter *neorpc.TxFilter + ch chan<- *transaction.Transaction +} + +// EventID implements neorpc.Comparator interface. +func (r *txReceiver) EventID() neorpc.EventID { + return neorpc.TransactionEventID +} + +// Filter implements neorpc.Comparator interface. +func (r *txReceiver) Filter() interface{} { + if r.filter == nil { + return nil + } + return *r.filter +} + +// Receiver implements notificationReceiver interface. +func (r *txReceiver) Receiver() interface{} { + return r.ch +} + +// TrySend implements notificationReceiver interface. +func (r *txReceiver) TrySend(ntf Notification) bool { + if rpcevent.Matches(r, ntf) { + r.ch <- ntf.Value.(*transaction.Transaction) + return true + } + return false +} + +// Close implements notificationReceiver interface. +func (r *txReceiver) Close() { + close(r.ch) +} + +// executionNotificationReceiver stores information about execution notifications subscriber. +type executionNotificationReceiver struct { + filter *neorpc.NotificationFilter + ch chan<- *state.ContainedNotificationEvent +} + +// EventID implements neorpc.Comparator interface. +func (r *executionNotificationReceiver) EventID() neorpc.EventID { + return neorpc.NotificationEventID +} + +// Filter implements neorpc.Comparator interface. +func (r *executionNotificationReceiver) Filter() interface{} { + if r.filter == nil { + return nil + } + return *r.filter +} + +// Receiver implements notificationReceiver interface. +func (r *executionNotificationReceiver) Receiver() interface{} { + return r.ch +} + +// TrySend implements notificationReceiver interface. +func (r *executionNotificationReceiver) TrySend(ntf Notification) bool { + if rpcevent.Matches(r, ntf) { + r.ch <- ntf.Value.(*state.ContainedNotificationEvent) + return true + } + return false +} + +// Close implements notificationReceiver interface. +func (r *executionNotificationReceiver) Close() { + close(r.ch) +} + +// executionReceiver stores information about application execution results subscriber. +type executionReceiver struct { + filter *neorpc.ExecutionFilter + ch chan<- *state.AppExecResult +} + +// EventID implements neorpc.Comparator interface. +func (r *executionReceiver) EventID() neorpc.EventID { + return neorpc.ExecutionEventID +} + +// Filter implements neorpc.Comparator interface. +func (r *executionReceiver) Filter() interface{} { + if r.filter == nil { + return nil + } + return *r.filter +} + +// Receiver implements notificationReceiver interface. +func (r *executionReceiver) Receiver() interface{} { + return r.ch +} + +// TrySend implements notificationReceiver interface. +func (r *executionReceiver) TrySend(ntf Notification) bool { + if rpcevent.Matches(r, ntf) { + r.ch <- ntf.Value.(*state.AppExecResult) + return true + } + return false +} + +// Close implements notificationReceiver interface. +func (r *executionReceiver) Close() { + close(r.ch) +} + +// notaryRequestReceiver stores information about notary requests subscriber. +type notaryRequestReceiver struct { + filter *neorpc.TxFilter + ch chan<- *result.NotaryRequestEvent +} + +// EventID implements neorpc.Comparator interface. +func (r *notaryRequestReceiver) EventID() neorpc.EventID { + return neorpc.NotaryRequestEventID +} + +// Filter implements neorpc.Comparator interface. +func (r *notaryRequestReceiver) Filter() interface{} { + if r.filter == nil { + return nil + } + return *r.filter +} + +// Receiver implements notificationReceiver interface. +func (r *notaryRequestReceiver) Receiver() interface{} { + return r.ch +} + +// TrySend implements notificationReceiver interface. +func (r *notaryRequestReceiver) TrySend(ntf Notification) bool { + if rpcevent.Matches(r, ntf) { + r.ch <- ntf.Value.(*result.NotaryRequestEvent) + return true + } + return false +} + +// Close implements notificationReceiver interface. +func (r *notaryRequestReceiver) Close() { + close(r.ch) +} + +// naiveReceiver is a structure leaved for deprecated single channel based notifications +// delivering. +// +// Deprecated: this receiver must be removed after outdated subscriptions API removal. +type naiveReceiver struct { + eventID neorpc.EventID + filter interface{} + ch chan<- Notification +} + +// EventID implements neorpc.Comparator interface. +func (r *naiveReceiver) EventID() neorpc.EventID { + return r.eventID +} + +// Filter implements neorpc.Comparator interface. +func (r *naiveReceiver) Filter() interface{} { return r.filter } +// Receiver implements notificationReceiver interface. +func (r *naiveReceiver) Receiver() interface{} { + return r.ch +} + +// TrySend implements notificationReceiver interface. +func (r *naiveReceiver) TrySend(ntf Notification) bool { + if rpcevent.Matches(r, ntf) { + r.ch <- ntf + return true + } + return false +} + +// Close implements notificationReceiver interface. +func (r *naiveReceiver) Close() { + r.ch <- Notification{ + Type: neorpc.MissedEventID, // backwards-compatible behaviour + } +} + // Notification represents a server-generated notification for client subscriptions. // Value can be one of *block.Block, *state.AppExecResult, *state.ContainedNotificationEvent // *transaction.Transaction or *subscriptions.NotaryRequestEvent based on Type. @@ -111,6 +348,9 @@ const ( wsWriteLimit = wsPingPeriod / 2 ) +// ErrNilNotificationReceiver is returned when notification receiver channel is nil. +var ErrNilNotificationReceiver = errors.New("nil notification receiver") + // errConnClosedByUser is a WSClient error used iff the user calls (*WSClient).Close method by himself. var errConnClosedByUser = errors.New("connection closed by user") @@ -138,6 +378,7 @@ func NewWS(ctx context.Context, endpoint string, opts Options) (*WSClient, error respChannels: make(map[uint64]chan *neorpc.Response), requests: make(chan *neorpc.Request), subscriptions: make(map[string]notificationReceiver), + receivers: make(map[interface{}][]string), } err = initClient(ctx, &wsc.Client, endpoint, opts) @@ -237,13 +478,22 @@ readloop: break readloop } } - ok := make(map[chan<- Notification]bool) + if event == neorpc.MissedEventID { + c.subscriptionsLock.Lock() + for rcvr, ids := range c.receivers { + c.subscriptions[ids[0]].Close() + delete(c.receivers, rcvr) + } + c.subscriptionsLock.Unlock() + continue readloop + } c.subscriptionsLock.RLock() - for _, rcvr := range c.subscriptions { - ntf := Notification{Type: event, Value: val} - if (rpcevent.Matches(rcvr, ntf) || event == neorpc.MissedEventID /*missed event must be delivered to each receiver*/) && !ok[rcvr.ch] { - ok[rcvr.ch] = true // strictly one notification per channel - rcvr.ch <- ntf // this will block other receivers + ntf := Notification{Type: event, Value: val} + for _, ids := range c.receivers { + for _, id := range ids { + if c.subscriptions[id].TrySend(ntf) { + break // strictly one notification per channel + } } } c.subscriptionsLock.RUnlock() @@ -370,92 +620,110 @@ func (c *WSClient) performSubscription(params []interface{}, rcvr notificationRe defer c.subscriptionsLock.Unlock() c.subscriptions[resp] = rcvr + ch := rcvr.Receiver() + c.receivers[ch] = append(c.receivers[ch], resp) return resp, nil } func (c *WSClient) performUnsubscription(id string) error { - var resp bool - c.subscriptionsLock.Lock() defer c.subscriptionsLock.Unlock() if _, ok := c.subscriptions[id]; !ok { return errors.New("no subscription with this ID") } - if err := c.performRequest("unsubscribe", []interface{}{id}, &resp); err != nil { - return err - } - if !resp { - return errors.New("unsubscribe method returned false result") - } - delete(c.subscriptions, id) - return nil + return c.removeSubscription(id) } // SubscribeForNewBlocks adds subscription for new block events to this instance // of the client. It can be filtered by primary consensus node index and/or block -// index allowing to receive blocks since the specified index only, nil value is -// treated as missing filter. +// index allowing to receive blocks since/till the specified index only, nil value +// is treated as missing filter. // -// Deprecated: please, use SubscribeForNewBlocksWithChan. This method will be removed in future versions. +// Deprecated: please, use ReceiveBlocks. This method will be removed in future versions. func (c *WSClient) SubscribeForNewBlocks(primary *int, sinceIndex, tillIndex *uint32) (string, error) { - return c.SubscribeForNewBlocksWithChan(primary, sinceIndex, tillIndex, c.Notifications) -} - -// SubscribeForNewBlocksWithChan registers provided channel as a receiver for the -// new block events. Events can be filtered by primary consensus node index, nil -// value doesn't add any filters. If the receiver channel is nil, then the default -// Notifications channel will be used. The receiver channel must be properly read -// and drained after usage in order not to block other notification receivers. -func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, sinceIndex, tillIndex *uint32, rcvrCh chan<- Notification) (string, error) { - if rcvrCh == nil { - rcvrCh = c.Notifications - } - params := []interface{}{"block_added"} var flt *neorpc.BlockFilter if primary != nil || sinceIndex != nil || tillIndex != nil { flt = &neorpc.BlockFilter{Primary: primary, Since: sinceIndex, Till: tillIndex} - params = append(params, flt) } - rcvr := notificationReceiver{ - typ: neorpc.BlockEventID, + params := []interface{}{"block_added"} + if flt != nil { + params = append(params, *flt) + } + r := &naiveReceiver{ + eventID: neorpc.BlockEventID, + filter: flt, + ch: c.Notifications, + } + return c.performSubscription(params, r) +} + +// ReceiveBlocks registers provided channel as a receiver for the new block events. +// Events can be filtered by the given BlockFilter, nil value doesn't add any filter. +// The receiver channel must be properly read and drained after usage in order not +// to block other notification receivers. If multiple subscriptions share the same +// receiver channel, then matching notification is only sent once per channel. The +// receiver channel will be closed by the WSClient immediately after MissedEvent is +// received from the server; no unsubscription is performed in this case, so it's the +// user responsibility to unsubscribe. +func (c *WSClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) { + if rcvr == nil { + return "", ErrNilNotificationReceiver + } + params := []interface{}{"block_added"} + if flt != nil { + params = append(params, *flt) + } + r := &blockReceiver{ filter: flt, - ch: rcvrCh, + ch: rcvr, } - return c.performSubscription(params, rcvr) + return c.performSubscription(params, r) } // SubscribeForNewTransactions adds subscription for new transaction events to // this instance of the client. It can be filtered by the sender and/or the signer, nil // value is treated as missing filter. // -// Deprecated: please, use SubscribeForNewTransactionsWithChan. This method will be removed in future versions. +// Deprecated: please, use ReceiveTransactions. This method will be removed in future versions. func (c *WSClient) SubscribeForNewTransactions(sender *util.Uint160, signer *util.Uint160) (string, error) { - return c.SubscribeForNewTransactionsWithChan(sender, signer, c.Notifications) -} - -// SubscribeForNewTransactionsWithChan registers provided channel as a receiver -// for new transaction events. Events can be filtered by the sender and/or the -// signer, nil value is treated as missing filter. If the receiver channel is nil, -// then the default Notifications channel will be used. The receiver channel must be -// properly read and drained after usage in order not to block other notification -// receivers. -func (c *WSClient) SubscribeForNewTransactionsWithChan(sender *util.Uint160, signer *util.Uint160, rcvrCh chan<- Notification) (string, error) { - if rcvrCh == nil { - rcvrCh = c.Notifications - } - params := []interface{}{"transaction_added"} var flt *neorpc.TxFilter if sender != nil || signer != nil { flt = &neorpc.TxFilter{Sender: sender, Signer: signer} + } + params := []interface{}{"transaction_added"} + if flt != nil { params = append(params, *flt) } - rcvr := notificationReceiver{ - typ: neorpc.TransactionEventID, - filter: flt, - ch: rcvrCh, + r := &naiveReceiver{ + eventID: neorpc.TransactionEventID, + filter: flt, + ch: c.Notifications, } - return c.performSubscription(params, rcvr) + return c.performSubscription(params, r) +} + +// ReceiveTransactions registers provided channel as a receiver for new transaction +// events. Events can be filtered by the given TxFilter, nil value doesn't add any +// filter. The receiver channel must be properly read and drained after usage in +// order not to block other notification receivers. If multiple subscriptions share +// the same receiver channel, then matching notification is only sent once per channel. +// The receiver channel will be closed by the WSClient immediately after MissedEvent is +// received from the server; no unsubscription is performed in this case, so it's the +// user responsibility to unsubscribe. +func (c *WSClient) ReceiveTransactions(flt *neorpc.TxFilter, rcvr chan<- *transaction.Transaction) (string, error) { + if rcvr == nil { + return "", ErrNilNotificationReceiver + } + params := []interface{}{"transaction_added"} + if flt != nil { + params = append(params, *flt) + } + r := &txReceiver{ + filter: flt, + ch: rcvr, + } + return c.performSubscription(params, r) } // SubscribeForExecutionNotifications adds subscription for notifications @@ -463,33 +731,45 @@ func (c *WSClient) SubscribeForNewTransactionsWithChan(sender *util.Uint160, sig // filtered by the contract's hash (that emits notifications), nil value puts no such // restrictions. // -// Deprecated: please, use SubscribeForExecutionNotificationsWithChan. This method will be removed in future versions. +// Deprecated: please, use ReceiveExecutionNotifications. This method will be removed in future versions. func (c *WSClient) SubscribeForExecutionNotifications(contract *util.Uint160, name *string) (string, error) { - return c.SubscribeForExecutionNotificationsWithChan(contract, name, c.Notifications) -} - -// SubscribeForExecutionNotificationsWithChan registers provided channel as a -// receiver for execution events. Events can be filtered by the contract's hash -// (that emits notifications), nil value puts no such restrictions. If the -// receiver channel is nil, then the default Notifications channel will be used. -// The receiver channel must be properly read and drained after usage in order -// not to block other notification receivers. -func (c *WSClient) SubscribeForExecutionNotificationsWithChan(contract *util.Uint160, name *string, rcvrCh chan<- Notification) (string, error) { - if rcvrCh == nil { - rcvrCh = c.Notifications - } - params := []interface{}{"notification_from_execution"} var flt *neorpc.NotificationFilter if contract != nil || name != nil { flt = &neorpc.NotificationFilter{Contract: contract, Name: name} + } + params := []interface{}{"notification_from_execution"} + if flt != nil { params = append(params, *flt) } - rcvr := notificationReceiver{ - typ: neorpc.NotificationEventID, - filter: flt, - ch: rcvrCh, + r := &naiveReceiver{ + eventID: neorpc.NotificationEventID, + filter: flt, + ch: c.Notifications, } - return c.performSubscription(params, rcvr) + return c.performSubscription(params, r) +} + +// ReceiveExecutionNotifications registers provided channel as a receiver for execution +// events. Events can be filtered by the given NotificationFilter, nil value doesn't add +// any filter. The receiver channel must be properly read and drained after usage in +// order not to block other notification receivers. If multiple subscriptions share the +// same receiver channel, then matching notification is only sent once per channel. The +// receiver channel will be closed by the WSClient immediately after MissedEvent is +// received from the server; no unsubscription is performed in this case, so it's the +// user responsibility to unsubscribe. +func (c *WSClient) ReceiveExecutionNotifications(flt *neorpc.NotificationFilter, rcvr chan<- *state.ContainedNotificationEvent) (string, error) { + if rcvr == nil { + return "", ErrNilNotificationReceiver + } + params := []interface{}{"notification_from_execution"} + if flt != nil { + params = append(params, *flt) + } + r := &executionNotificationReceiver{ + filter: flt, + ch: rcvr, + } + return c.performSubscription(params, r) } // SubscribeForTransactionExecutions adds subscription for application execution @@ -498,39 +778,56 @@ func (c *WSClient) SubscribeForExecutionNotificationsWithChan(contract *util.Uin // transactions; it can also be filtered by script container hash. // nil value means no filtering. // -// Deprecated: please, use SubscribeForTransactionExecutionsWithChan. This method will be removed in future versions. -func (c *WSClient) SubscribeForTransactionExecutions(state *string, container *util.Uint256) (string, error) { - return c.SubscribeForTransactionExecutionsWithChan(state, container, c.Notifications) -} - -// SubscribeForTransactionExecutionsWithChan registers provided channel as a -// receiver for application execution result events generated during transaction -// execution. Events can be filtered by state (HALT/FAULT) to check for successful -// or failing transactions; it can also be filtered by script container hash. -// nil value means no filtering. If the receiver channel is nil, then the default -// Notifications channel will be used. The receiver channel must be properly read -// and drained after usage in order not to block other notification receivers. -func (c *WSClient) SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- Notification) (string, error) { - if rcvrCh == nil { - rcvrCh = c.Notifications +// Deprecated: please, use ReceiveExecutions. This method will be removed in future versions. +func (c *WSClient) SubscribeForTransactionExecutions(vmState *string, container *util.Uint256) (string, error) { + var flt *neorpc.ExecutionFilter + if vmState != nil || container != nil { + flt = &neorpc.ExecutionFilter{State: vmState, Container: container} } params := []interface{}{"transaction_executed"} - var flt *neorpc.ExecutionFilter - if state != nil || container != nil { - if state != nil { - if *state != "HALT" && *state != "FAULT" { + if flt != nil { + if flt.State != nil { + if *flt.State != "HALT" && *flt.State != "FAULT" { return "", errors.New("bad state parameter") } } - flt = &neorpc.ExecutionFilter{State: state, Container: container} params = append(params, *flt) } - rcvr := notificationReceiver{ - typ: neorpc.ExecutionEventID, - filter: flt, - ch: rcvrCh, + r := &naiveReceiver{ + eventID: neorpc.ExecutionEventID, + filter: flt, + ch: c.Notifications, } - return c.performSubscription(params, rcvr) + return c.performSubscription(params, r) +} + +// ReceiveExecutions registers provided channel as a receiver for +// application execution result events generated during transaction execution. +// Events can be filtered by the given ExecutionFilter, nil value doesn't add any filter. +// The receiver channel must be properly read and drained after usage in order not +// to block other notification receivers. If multiple subscriptions share the same +// receiver channel, then matching notification is only sent once per channel. The +// receiver channel will be closed by the WSClient immediately after MissedEvent is +// received from the server; no unsubscription is performed in this case, so it's the +// user responsibility to unsubscribe. +func (c *WSClient) ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error) { + if rcvr == nil { + return "", ErrNilNotificationReceiver + } + params := []interface{}{"transaction_executed"} + if flt != nil { + if flt.State != nil { + if *flt.State != "HALT" && *flt.State != "FAULT" { + return "", errors.New("bad state parameter") + } + } + params = append(params, *flt) + } + r := &executionReceiver{ + filter: flt, + ch: rcvr, + } + return c.performSubscription(params, r) } // SubscribeForNotaryRequests adds subscription for notary request payloads @@ -538,33 +835,47 @@ func (c *WSClient) SubscribeForTransactionExecutionsWithChan(state *string, cont // request sender's hash, or main tx signer's hash, nil value puts no such // restrictions. // -// Deprecated: please, use SubscribeForNotaryRequestsWithChan. This method will be removed in future versions. +// Deprecated: please, use ReceiveNotaryRequests. This method will be removed in future versions. func (c *WSClient) SubscribeForNotaryRequests(sender *util.Uint160, mainSigner *util.Uint160) (string, error) { - return c.SubscribeForNotaryRequestsWithChan(sender, mainSigner, c.Notifications) -} - -// SubscribeForNotaryRequestsWithChan registers provided channel as a receiver -// for notary request payload addition or removal events. It can be filtered by -// request sender's hash, or main tx signer's hash, nil value puts no such -// restrictions. If the receiver channel is nil, then the default Notifications -// channel will be used. The receiver channel must be properly read and drained -// after usage in order not to block other notification receivers. -func (c *WSClient) SubscribeForNotaryRequestsWithChan(sender *util.Uint160, mainSigner *util.Uint160, rcvrCh chan<- Notification) (string, error) { - if rcvrCh == nil { - rcvrCh = c.Notifications + var flt *neorpc.TxFilter + if sender != nil || mainSigner != nil { + flt = &neorpc.TxFilter{Sender: sender, Signer: mainSigner} } params := []interface{}{"notary_request_event"} - var flt *neorpc.TxFilter - if sender != nil { - flt = &neorpc.TxFilter{Sender: sender, Signer: mainSigner} + if flt != nil { params = append(params, *flt) } - rcvr := notificationReceiver{ - typ: neorpc.NotaryRequestEventID, - filter: flt, - ch: rcvrCh, + r := &naiveReceiver{ + eventID: neorpc.NotaryRequestEventID, + filter: flt, + ch: c.Notifications, } - return c.performSubscription(params, rcvr) + return c.performSubscription(params, r) +} + +// ReceiveNotaryRequests registers provided channel as a receiver for notary request +// payload addition or removal events. Events can be filtered by the given TxFilter +// where sender corresponds to notary request sender (the second fallback transaction +// signer) and signer corresponds to main transaction signers. nil value doesn't add +// any filter. The receiver channel must be properly read and drained after usage in +// order not to block other notification receivers. If multiple subscriptions share +// the same receiver channel, then matching notification is only sent once per channel. +// The receiver channel will be closed by the WSClient immediately after MissedEvent +// is received from the server; no unsubscription is performed in this case, so it's the +// user responsibility to unsubscribe. +func (c *WSClient) ReceiveNotaryRequests(flt *neorpc.TxFilter, rcvr chan<- *result.NotaryRequestEvent) (string, error) { + if rcvr == nil { + return "", ErrNilNotificationReceiver + } + params := []interface{}{"notary_request_event"} + if flt != nil { + params = append(params, *flt) + } + r := ¬aryRequestReceiver{ + filter: flt, + ch: rcvr, + } + return c.performSubscription(params, r) } // Unsubscribe removes subscription for the given event stream. @@ -578,18 +889,43 @@ func (c *WSClient) UnsubscribeAll() error { defer c.subscriptionsLock.Unlock() for id := range c.subscriptions { - var resp bool - if err := c.performRequest("unsubscribe", []interface{}{id}, &resp); err != nil { + err := c.removeSubscription(id) + if err != nil { return err } - if !resp { - return errors.New("unsubscribe method returned false result") - } - delete(c.subscriptions, id) } return nil } +// removeSubscription is internal method that removes subscription with the given +// ID from the list of subscriptions and receivers. It must be performed under +// subscriptions lock. +func (c *WSClient) removeSubscription(id string) error { + var resp bool + if err := c.performRequest("unsubscribe", []interface{}{id}, &resp); err != nil { + return err + } + if !resp { + return errors.New("unsubscribe method returned false result") + } + rcvr := c.subscriptions[id] + ch := rcvr.Receiver() + ids := c.receivers[ch] + for i, rcvrID := range ids { + if rcvrID == id { + ids = append(ids[:i], ids[i+1:]...) + break + } + } + if len(ids) == 0 { + delete(c.receivers, ch) + } else { + c.receivers[ch] = ids + } + delete(c.subscriptions, id) + return nil +} + // setCloseErr is a thread-safe method setting closeErr in case if it's not yet set. func (c *WSClient) setCloseErr(err error) { c.closeErrLock.Lock() diff --git a/pkg/rpcclient/wsclient_test.go b/pkg/rpcclient/wsclient_test.go index e0ea0fa8f..1d7f3f774 100644 --- a/pkg/rpcclient/wsclient_test.go +++ b/pkg/rpcclient/wsclient_test.go @@ -15,11 +15,16 @@ import ( "github.com/gorilla/websocket" "github.com/nspcc-dev/neo-go/pkg/config/netmode" + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/neorpc" + "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/services/rpcsrv/params" "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neo-go/pkg/vm/vmstate" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" ) @@ -32,31 +37,26 @@ func TestWSClientClose(t *testing.T) { } func TestWSClientSubscription(t *testing.T) { - ch := make(chan Notification) + bCh := make(chan *block.Block) + txCh := make(chan *transaction.Transaction) + aerCh := make(chan *state.AppExecResult) + ntfCh := make(chan *state.ContainedNotificationEvent) + ntrCh := make(chan *result.NotaryRequestEvent) var cases = map[string]func(*WSClient) (string, error){ "blocks": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForNewBlocksWithChan(nil, nil, nil, nil) - }, - "blocks_with_custom_ch": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForNewBlocksWithChan(nil, nil, nil, ch) + return wsc.ReceiveBlocks(nil, bCh) }, "transactions": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForNewTransactionsWithChan(nil, nil, nil) - }, - "transactions_with_custom_ch": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForNewTransactionsWithChan(nil, nil, ch) + return wsc.ReceiveTransactions(nil, txCh) }, "notifications": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForExecutionNotificationsWithChan(nil, nil, nil) - }, - "notifications_with_custom_ch": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForExecutionNotificationsWithChan(nil, nil, ch) + return wsc.ReceiveExecutionNotifications(nil, ntfCh) }, "executions": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForTransactionExecutionsWithChan(nil, nil, nil) + return wsc.ReceiveExecutions(nil, aerCh) }, - "executions_with_custom_ch": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForTransactionExecutionsWithChan(nil, nil, ch) + "notary requests": func(wsc *WSClient) (string, error) { + return wsc.ReceiveNotaryRequests(nil, ntrCh) }, } t.Run("good", func(t *testing.T) { @@ -96,13 +96,13 @@ func TestWSClientUnsubscription(t *testing.T) { var cases = map[string]responseCheck{ "good": {`{"jsonrpc": "2.0", "id": 1, "result": true}`, func(t *testing.T, wsc *WSClient) { // We can't really subscribe using this stub server, so set up wsc internals. - wsc.subscriptions["0"] = notificationReceiver{} + wsc.subscriptions["0"] = &blockReceiver{} err := wsc.Unsubscribe("0") require.NoError(t, err) }}, "all": {`{"jsonrpc": "2.0", "id": 1, "result": true}`, func(t *testing.T, wsc *WSClient) { // We can't really subscribe using this stub server, so set up wsc internals. - wsc.subscriptions["0"] = notificationReceiver{} + wsc.subscriptions["0"] = &blockReceiver{} err := wsc.UnsubscribeAll() require.NoError(t, err) require.Equal(t, 0, len(wsc.subscriptions)) @@ -113,13 +113,13 @@ func TestWSClientUnsubscription(t *testing.T) { }}, "error returned": {`{"jsonrpc": "2.0", "id": 1, "error":{"code":-32602,"message":"Invalid Params"}}`, func(t *testing.T, wsc *WSClient) { // We can't really subscribe using this stub server, so set up wsc internals. - wsc.subscriptions["0"] = notificationReceiver{} + wsc.subscriptions["0"] = &blockReceiver{} err := wsc.Unsubscribe("0") require.Error(t, err) }}, "false returned": {`{"jsonrpc": "2.0", "id": 1, "result": false}`, func(t *testing.T, wsc *WSClient) { // We can't really subscribe using this stub server, so set up wsc internals. - wsc.subscriptions["0"] = notificationReceiver{} + wsc.subscriptions["0"] = &blockReceiver{} err := wsc.Unsubscribe("0") require.Error(t, err) }}, @@ -144,7 +144,7 @@ func TestWSClientEvents(t *testing.T) { `{"jsonrpc":"2.0","method":"notification_from_execution","params":[{"container":"0xe1cd5e57e721d2a2e05fb1f08721b12057b25ab1dd7fd0f33ee1639932fdfad7","contract":"0x1b4357bff5a01bdf2a6581247cf9ed1e24629176","eventname":"contract call","state":{"type":"Array","value":[{"type":"ByteString","value":"dHJhbnNmZXI="},{"type":"Array","value":[{"type":"ByteString","value":"dpFiJB7t+XwkgWUq3xug9b9XQxs="},{"type":"ByteString","value":"MW6FEDkBnTnfwsN9bD/uGf1YCYc="},{"type":"Integer","value":"1000"}]}]}}]}`, `{"jsonrpc":"2.0","method":"transaction_executed","params":[{"container":"0xf97a72b7722c109f909a8bc16c22368c5023d85828b09b127b237aace33cf099","trigger":"Application","vmstate":"HALT","gasconsumed":"6042610","stack":[],"notifications":[{"contract":"0xe65ff7b3a02d207b584a5c27057d4e9862ef01da","eventname":"contract call","state":{"type":"Array","value":[{"type":"ByteString","value":"dHJhbnNmZXI="},{"type":"Array","value":[{"type":"ByteString","value":"MW6FEDkBnTnfwsN9bD/uGf1YCYc="},{"type":"ByteString","value":"IHKCdK+vw29DoHHTKM+j5inZy7A="},{"type":"Integer","value":"123"}]}]}},{"contract":"0xe65ff7b3a02d207b584a5c27057d4e9862ef01da","eventname":"transfer","state":{"type":"Array","value":[{"type":"ByteString","value":"MW6FEDkBnTnfwsN9bD/uGf1YCYc="},{"type":"ByteString","value":"IHKCdK+vw29DoHHTKM+j5inZy7A="},{"type":"Integer","value":"123"}]}}]}]}`, fmt.Sprintf(`{"jsonrpc":"2.0","method":"block_added","params":[%s]}`, b1Verbose), - `{"jsonrpc":"2.0","method":"event_missed","params":[]}`, + `{"jsonrpc":"2.0","method":"event_missed","params":[]}`, // the last one, will trigger receiver channels closing. } srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { if req.URL.Path == "/ws" && req.Method == "GET" { @@ -163,109 +163,123 @@ func TestWSClientEvents(t *testing.T) { return } })) + wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{}) + require.NoError(t, err) + wsc.getNextRequestID = getTestRequestID + wsc.cacheLock.Lock() + wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually. + wsc.cache.network = netmode.UnitTestNet + wsc.cacheLock.Unlock() - t.Run("default ntf channel", func(t *testing.T) { - wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{}) - require.NoError(t, err) - wsc.getNextRequestID = getTestRequestID - wsc.cacheLock.Lock() - wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually. - wsc.cache.network = netmode.UnitTestNet - wsc.cacheLock.Unlock() - // Our server mock is restricted, so perform subscriptions manually with default notifications channel. - wsc.subscriptionsLock.Lock() - wsc.subscriptions["0"] = notificationReceiver{typ: neorpc.BlockEventID, ch: wsc.Notifications} - wsc.subscriptions["1"] = notificationReceiver{typ: neorpc.ExecutionEventID, ch: wsc.Notifications} - wsc.subscriptions["2"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: wsc.Notifications} - // MissedEvent must be delivered without subscription. - wsc.subscriptionsLock.Unlock() - for range events { - select { - case _, ok = <-wsc.Notifications: - case <-time.After(time.Second): - t.Fatal("timeout waiting for event") - } - require.True(t, ok) - } + // Our server mock is restricted, so perform subscriptions manually with default notifications channel. + bCh1 := make(chan *block.Block) + bCh2 := make(chan *block.Block) + aerCh1 := make(chan *state.AppExecResult) + aerCh2 := make(chan *state.AppExecResult) + aerCh3 := make(chan *state.AppExecResult) + ntfCh := make(chan *state.ContainedNotificationEvent) + halt := "HALT" + fault := "FAULT" + wsc.subscriptionsLock.Lock() + wsc.subscriptions["0"] = &blockReceiver{ch: bCh1} + wsc.receivers[chan<- *block.Block(bCh1)] = []string{"0"} + wsc.subscriptions["1"] = &blockReceiver{ch: bCh2} // two different channels subscribed for same notifications + wsc.receivers[chan<- *block.Block(bCh2)] = []string{"1"} + + wsc.subscriptions["2"] = &executionNotificationReceiver{ch: ntfCh} + wsc.subscriptions["3"] = &executionNotificationReceiver{ch: ntfCh} // check duplicating subscriptions + wsc.receivers[chan<- *state.ContainedNotificationEvent(ntfCh)] = []string{"2", "3"} + + wsc.subscriptions["4"] = &executionReceiver{ch: aerCh1} + wsc.receivers[chan<- *state.AppExecResult(aerCh1)] = []string{"4"} + wsc.subscriptions["5"] = &executionReceiver{filter: &neorpc.ExecutionFilter{State: &halt}, ch: aerCh2} + wsc.receivers[chan<- *state.AppExecResult(aerCh2)] = []string{"5"} + wsc.subscriptions["6"] = &executionReceiver{filter: &neorpc.ExecutionFilter{State: &fault}, ch: aerCh3} + wsc.receivers[chan<- *state.AppExecResult(aerCh3)] = []string{"6"} + // MissedEvent must close the channels above. + + wsc.subscriptions["7"] = &naiveReceiver{eventID: neorpc.BlockEventID, ch: wsc.Notifications} + wsc.subscriptions["8"] = &naiveReceiver{eventID: neorpc.BlockEventID, ch: wsc.Notifications} // check duplicating subscriptions + wsc.subscriptions["9"] = &naiveReceiver{eventID: neorpc.ExecutionEventID, ch: wsc.Notifications} // check different events + wsc.receivers[wsc.Notifications] = []string{"7", "8", "9"} + wsc.subscriptionsLock.Unlock() + + var ( + b1Cnt, b2Cnt int + aer1Cnt, aer2Cnt, aer3Cnt int + ntfCnt int + defaultCount int + expectedb1Cnt, expectedb2Cnt = 1, 1 // single Block event + expectedaer1Cnt, expectedaer2Cnt, expectedaer3Cnt = 2, 2, 0 // two HALTED AERs + expectedntfCnt = 1 // single notification event + expectedDefaultCnt = 1 + 2 + 1 // single Block event + two AERs + missed event + aer *state.AppExecResult + ) + for b1Cnt+b2Cnt+ + aer1Cnt+aer2Cnt+aer3Cnt+ + ntfCnt+ + defaultCount != + expectedb1Cnt+expectedb2Cnt+ + expectedaer1Cnt+expectedaer2Cnt+expectedaer3Cnt+ + expectedntfCnt+ + expectedDefaultCnt { select { + case _, ok = <-bCh1: + if ok { + b1Cnt++ + } + case _, ok = <-bCh2: + if ok { + b2Cnt++ + } + case _, ok = <-aerCh1: + if ok { + aer1Cnt++ + } + case aer, ok = <-aerCh2: + if ok { + require.Equal(t, vmstate.Halt, aer.VMState) + aer2Cnt++ + } + case _, ok = <-aerCh3: + if ok { + aer3Cnt++ + } + case _, ok = <-ntfCh: + if ok { + ntfCnt++ + } case _, ok = <-wsc.Notifications: + if ok { + defaultCount++ + } case <-time.After(time.Second): t.Fatal("timeout waiting for event") } - // Connection closed by server. - require.False(t, ok) - }) - t.Run("multiple ntf channels", func(t *testing.T) { - wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{}) - require.NoError(t, err) - wsc.getNextRequestID = getTestRequestID - wsc.cacheLock.Lock() - wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually. - wsc.cache.network = netmode.UnitTestNet - wsc.cacheLock.Unlock() + } + assert.Equal(t, expectedb1Cnt, b1Cnt) + assert.Equal(t, expectedb2Cnt, b2Cnt) + assert.Equal(t, expectedaer1Cnt, aer1Cnt) + assert.Equal(t, expectedaer2Cnt, aer2Cnt) + assert.Equal(t, expectedaer3Cnt, aer3Cnt) + assert.Equal(t, expectedntfCnt, ntfCnt) + assert.Equal(t, expectedDefaultCnt, defaultCount) - // Our server mock is restricted, so perform subscriptions manually with default notifications channel. - ch1 := make(chan Notification) - ch2 := make(chan Notification) - ch3 := make(chan Notification) - halt := "HALT" - fault := "FAULT" - wsc.subscriptionsLock.Lock() - wsc.subscriptions["0"] = notificationReceiver{typ: neorpc.BlockEventID, ch: wsc.Notifications} - wsc.subscriptions["1"] = notificationReceiver{typ: neorpc.ExecutionEventID, ch: wsc.Notifications} - wsc.subscriptions["2"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: wsc.Notifications} - wsc.subscriptions["3"] = notificationReceiver{typ: neorpc.BlockEventID, ch: ch1} - wsc.subscriptions["4"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2} - wsc.subscriptions["5"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2} // check duplicating subscriptions - wsc.subscriptions["6"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: &halt}, ch: ch2} - wsc.subscriptions["7"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: &fault}, ch: ch3} - // MissedEvent must be delivered without subscription. - wsc.subscriptionsLock.Unlock() - - var ( - defaultChCnt int - ch1Cnt int - ch2Cnt int - ch3Cnt int - expectedDefaultCnCount = len(events) - expectedCh1Cnt = 1 + 1 // Block event + Missed event - expectedCh2Cnt = 1 + 2 + 1 // Notification event + 2 Execution events + Missed event - expectedCh3Cnt = 1 // Missed event - ntf Notification - ) - for i := 0; i < expectedDefaultCnCount+expectedCh1Cnt+expectedCh2Cnt+expectedCh3Cnt; i++ { - select { - case ntf, ok = <-wsc.Notifications: - defaultChCnt++ - case ntf, ok = <-ch1: - require.True(t, ntf.Type == neorpc.BlockEventID || ntf.Type == neorpc.MissedEventID, ntf.Type) - ch1Cnt++ - case ntf, ok = <-ch2: - require.True(t, ntf.Type == neorpc.NotificationEventID || ntf.Type == neorpc.MissedEventID || ntf.Type == neorpc.ExecutionEventID) - ch2Cnt++ - case ntf, ok = <-ch3: - require.True(t, ntf.Type == neorpc.MissedEventID) - ch3Cnt++ - case <-time.After(time.Second): - t.Fatal("timeout waiting for event") - } - require.True(t, ok) - } - select { - case _, ok = <-wsc.Notifications: - case _, ok = <-ch1: - case _, ok = <-ch2: - case _, ok = <-ch3: - case <-time.After(time.Second): - t.Fatal("timeout waiting for event") - } - // Connection closed by server. - require.False(t, ok) - require.Equal(t, expectedDefaultCnCount, defaultChCnt) - require.Equal(t, expectedCh1Cnt, ch1Cnt) - require.Equal(t, expectedCh2Cnt, ch2Cnt) - require.Equal(t, expectedCh3Cnt, ch3Cnt) - }) + // Channels must be closed by server + _, ok = <-bCh1 + require.False(t, ok) + _, ok = <-bCh2 + require.False(t, ok) + _, ok = <-aerCh1 + require.False(t, ok) + _, ok = <-aerCh2 + require.False(t, ok) + _, ok = <-aerCh3 + require.False(t, ok) + _, ok = <-ntfCh + require.False(t, ok) + _, ok = <-ntfCh + require.False(t, ok) } func TestWSExecutionVMStateCheck(t *testing.T) { @@ -276,12 +290,16 @@ func TestWSExecutionVMStateCheck(t *testing.T) { wsc.getNextRequestID = getTestRequestID require.NoError(t, wsc.Init()) filter := "NONE" - _, err = wsc.SubscribeForTransactionExecutionsWithChan(&filter, nil, nil) + _, err = wsc.ReceiveExecutions(&neorpc.ExecutionFilter{State: &filter}, make(chan *state.AppExecResult)) require.Error(t, err) wsc.Close() } func TestWSFilteredSubscriptions(t *testing.T) { + bCh := make(chan *block.Block) + txCh := make(chan *transaction.Transaction) + aerCh := make(chan *state.AppExecResult) + ntfCh := make(chan *state.ContainedNotificationEvent) var cases = []struct { name string clientCode func(*testing.T, *WSClient) @@ -290,7 +308,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"blocks primary", func(t *testing.T, wsc *WSClient) { primary := 3 - _, err := wsc.SubscribeForNewBlocksWithChan(&primary, nil, nil, nil) + _, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{Primary: &primary}, bCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -305,7 +323,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"blocks since", func(t *testing.T, wsc *WSClient) { var since uint32 = 3 - _, err := wsc.SubscribeForNewBlocksWithChan(nil, &since, nil, nil) + _, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -320,7 +338,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"blocks till", func(t *testing.T, wsc *WSClient) { var till uint32 = 3 - _, err := wsc.SubscribeForNewBlocksWithChan(nil, nil, &till, nil) + _, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{Till: &till}, bCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -339,7 +357,11 @@ func TestWSFilteredSubscriptions(t *testing.T) { primary = 2 till uint32 = 5 ) - _, err := wsc.SubscribeForNewBlocksWithChan(&primary, &since, &till, nil) + _, err := wsc.ReceiveBlocks(&neorpc.BlockFilter{ + Primary: &primary, + Since: &since, + Till: &till, + }, bCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -354,7 +376,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"transactions sender", func(t *testing.T, wsc *WSClient) { sender := util.Uint160{1, 2, 3, 4, 5} - _, err := wsc.SubscribeForNewTransactionsWithChan(&sender, nil, nil) + _, err := wsc.ReceiveTransactions(&neorpc.TxFilter{Sender: &sender}, txCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -368,7 +390,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"transactions signer", func(t *testing.T, wsc *WSClient) { signer := util.Uint160{0, 42} - _, err := wsc.SubscribeForNewTransactionsWithChan(nil, &signer, nil) + _, err := wsc.ReceiveTransactions(&neorpc.TxFilter{Signer: &signer}, txCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -383,7 +405,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { func(t *testing.T, wsc *WSClient) { sender := util.Uint160{1, 2, 3, 4, 5} signer := util.Uint160{0, 42} - _, err := wsc.SubscribeForNewTransactionsWithChan(&sender, &signer, nil) + _, err := wsc.ReceiveTransactions(&neorpc.TxFilter{Sender: &sender, Signer: &signer}, txCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -397,7 +419,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"notifications contract hash", func(t *testing.T, wsc *WSClient) { contract := util.Uint160{1, 2, 3, 4, 5} - _, err := wsc.SubscribeForExecutionNotificationsWithChan(&contract, nil, nil) + _, err := wsc.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, ntfCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -411,7 +433,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"notifications name", func(t *testing.T, wsc *WSClient) { name := "my_pretty_notification" - _, err := wsc.SubscribeForExecutionNotificationsWithChan(nil, &name, nil) + _, err := wsc.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Name: &name}, ntfCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -426,7 +448,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { func(t *testing.T, wsc *WSClient) { contract := util.Uint160{1, 2, 3, 4, 5} name := "my_pretty_notification" - _, err := wsc.SubscribeForExecutionNotificationsWithChan(&contract, &name, nil) + _, err := wsc.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract, Name: &name}, ntfCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -440,7 +462,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"executions state", func(t *testing.T, wsc *WSClient) { state := "FAULT" - _, err := wsc.SubscribeForTransactionExecutionsWithChan(&state, nil, nil) + _, err := wsc.ReceiveExecutions(&neorpc.ExecutionFilter{State: &state}, aerCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -454,7 +476,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"executions container", func(t *testing.T, wsc *WSClient) { container := util.Uint256{1, 2, 3} - _, err := wsc.SubscribeForTransactionExecutionsWithChan(nil, &container, nil) + _, err := wsc.ReceiveExecutions(&neorpc.ExecutionFilter{Container: &container}, aerCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -469,7 +491,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { func(t *testing.T, wsc *WSClient) { state := "FAULT" container := util.Uint256{1, 2, 3} - _, err := wsc.SubscribeForTransactionExecutionsWithChan(&state, &container, nil) + _, err := wsc.ReceiveExecutions(&neorpc.ExecutionFilter{State: &state, Container: &container}, aerCh) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { From 581168783607ad01988eca18c78a98d913b1359d Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Wed, 26 Oct 2022 10:31:25 +0300 Subject: [PATCH 4/6] rpc: fix bug in Actor's waiter Execution events are followed by block events, not vise versa, thus, we can wait until VUB block to be accepted to be sure that transaction wasn't accepted to chain. --- pkg/rpcclient/actor/waiter.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go index 4eae26f81..0e86aebb1 100644 --- a/pkg/rpcclient/actor/waiter.go +++ b/pkg/rpcclient/actor/waiter.go @@ -244,8 +244,8 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui close(aerRcvr) } }() - // Execution event follows the block event, thus wait until the block next to the VUB to be sure. - since := vub + 1 + // Execution event precedes the block event, thus wait until the VUB-th block to be sure. + since := vub blocksID, err := w.ws.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bRcvr) if err != nil { wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err) From f5441f60851feb046e2f6b03f5476c94794d87e5 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Wed, 26 Oct 2022 13:06:57 +0300 Subject: [PATCH 5/6] docs: fix doc for `transaction_executed` WS notification It returns *state.AppExecResult. --- docs/notifications.md | 103 ++++++++++++++++++++---------------------- 1 file changed, 50 insertions(+), 53 deletions(-) diff --git a/docs/notifications.md b/docs/notifications.md index 740dfb392..741d4a200 100644 --- a/docs/notifications.md +++ b/docs/notifications.md @@ -347,8 +347,10 @@ Example: ### `transaction_executed` notification It contains the same result as from `getapplicationlog` method in the first -parameter and no other parameters. The only difference from `getapplicationlog` is -that it always contains zero in the `contract` field. +parameter and no other parameters. The difference from `getapplicationlog` is +that it has block's or transaction's hex-encoded LE Uint256 hash in the `container` +field instead of two separate `txid` and `blockhash` fields and a single execution +instead of an executions array. Example: ``` @@ -357,61 +359,31 @@ Example: "params" : [ { "container" : "0xe1cd5e57e721d2a2e05fb1f08721b12057b25ab1dd7fd0f33ee1639932fdfad7", - "executions" : [ + "trigger" : "Application", + "gasconsumed" : "2.291", + "stack" : [], + "notifications" : [ { - "trigger" : "Application", - "gasconsumed" : "2.291", - "contract" : "0x0000000000000000000000000000000000000000", - "stack" : [], - "notifications" : [ - { - "state" : { - "type" : "Array", - "value" : [ - { - "value" : "636f6e74726163742063616c6c", - "type" : "ByteString" - }, - { - "type" : "ByteString", - "value" : "7472616e73666572" - }, - { - "value" : [ - { - "value" : "769162241eedf97c2481652adf1ba0f5bf57431b", - "type" : "ByteString" - }, - { - "type" : "ByteString", - "value" : "316e851039019d39dfc2c37d6c3fee19fd580987" - }, - { - "value" : "1000", - "type" : "Integer" - } - ], - "type" : "Array" - } - ] + "state" : { + "type" : "Array", + "value" : [ + { + "value" : "636f6e74726163742063616c6c", + "type" : "ByteString" }, - "contract" : "0x1b4357bff5a01bdf2a6581247cf9ed1e24629176" - }, - { - "contract" : "0x1b4357bff5a01bdf2a6581247cf9ed1e24629176", - "state" : { + { + "type" : "ByteString", + "value" : "7472616e73666572" + }, + { "value" : [ - { - "value" : "7472616e73666572", - "type" : "ByteString" - }, { "value" : "769162241eedf97c2481652adf1ba0f5bf57431b", "type" : "ByteString" }, { - "value" : "316e851039019d39dfc2c37d6c3fee19fd580987", - "type" : "ByteString" + "type" : "ByteString", + "value" : "316e851039019d39dfc2c37d6c3fee19fd580987" }, { "value" : "1000", @@ -420,11 +392,36 @@ Example: ], "type" : "Array" } - } - ], - "vmstate" : "HALT" + ] + }, + "contract" : "0x1b4357bff5a01bdf2a6581247cf9ed1e24629176" + }, + { + "contract" : "0x1b4357bff5a01bdf2a6581247cf9ed1e24629176", + "state" : { + "value" : [ + { + "value" : "7472616e73666572", + "type" : "ByteString" + }, + { + "value" : "769162241eedf97c2481652adf1ba0f5bf57431b", + "type" : "ByteString" + }, + { + "value" : "316e851039019d39dfc2c37d6c3fee19fd580987", + "type" : "ByteString" + }, + { + "value" : "1000", + "type" : "Integer" + } + ], + "type" : "Array" + } } - ] + ], + "vmstate" : "HALT" } ], "jsonrpc" : "2.0" From 4fc11c29240feb43211f4f24585f4bc66172a735 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Wed, 26 Oct 2022 13:22:12 +0300 Subject: [PATCH 6/6] rpc: revert deprecated subscriptions API changes Revert deprecated subscriptions-related method signature changed in 673a495527602e754ad5670e3f8a097046d1cbd9, 8e84bb51d54ef3f22d5d1e58524884f6016bf1ce and d7c1f3eac7da319dd0be6d79fdd7ecff8ee59add. --- pkg/rpcclient/wsclient.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index 236bd9fa0..1a4c7cd3a 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -636,15 +636,14 @@ func (c *WSClient) performUnsubscription(id string) error { } // SubscribeForNewBlocks adds subscription for new block events to this instance -// of the client. It can be filtered by primary consensus node index and/or block -// index allowing to receive blocks since/till the specified index only, nil value -// is treated as missing filter. +// of the client. It can be filtered by primary consensus node index, nil value doesn't +// add any filters. // // Deprecated: please, use ReceiveBlocks. This method will be removed in future versions. -func (c *WSClient) SubscribeForNewBlocks(primary *int, sinceIndex, tillIndex *uint32) (string, error) { +func (c *WSClient) SubscribeForNewBlocks(primary *int) (string, error) { var flt *neorpc.BlockFilter - if primary != nil || sinceIndex != nil || tillIndex != nil { - flt = &neorpc.BlockFilter{Primary: primary, Since: sinceIndex, Till: tillIndex} + if primary != nil { + flt = &neorpc.BlockFilter{Primary: primary} } params := []interface{}{"block_added"} if flt != nil { @@ -775,14 +774,13 @@ func (c *WSClient) ReceiveExecutionNotifications(flt *neorpc.NotificationFilter, // SubscribeForTransactionExecutions adds subscription for application execution // results generated during transaction execution to this instance of the client. It can // be filtered by state (HALT/FAULT) to check for successful or failing -// transactions; it can also be filtered by script container hash. -// nil value means no filtering. +// transactions, nil value means no filtering. // // Deprecated: please, use ReceiveExecutions. This method will be removed in future versions. -func (c *WSClient) SubscribeForTransactionExecutions(vmState *string, container *util.Uint256) (string, error) { +func (c *WSClient) SubscribeForTransactionExecutions(state *string) (string, error) { var flt *neorpc.ExecutionFilter - if vmState != nil || container != nil { - flt = &neorpc.ExecutionFilter{State: vmState, Container: container} + if state != nil { + flt = &neorpc.ExecutionFilter{State: state} } params := []interface{}{"transaction_executed"} if flt != nil {